hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537330 [5/11] - in /hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/sr...
Date Wed, 30 Oct 2013 22:22:22 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Oct 30 22:21:59 2013
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hdfs.util.Light
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -89,9 +91,6 @@ public class BlockManager {
   static final Log LOG = LogFactory.getLog(BlockManager.class);
   public static final Log blockLog = NameNode.blockStateChangeLog;
 
-  /** Default load factor of map */
-  public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
-
   private static final String QUEUE_REASON_CORRUPT_STATE =
     "it has the wrong state or generation stamp";
 
@@ -243,7 +242,8 @@ public class BlockManager {
     invalidateBlocks = new InvalidateBlocks(datanodeManager);
 
     // Compute the map capacity by allocating 2% of total memory
-    blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
+    blocksMap = new BlocksMap(
+        LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
     blockplacement = BlockPlacementPolicy.getInstance(
         conf, stats, datanodeManager.getNetworkTopology());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
@@ -1258,22 +1258,19 @@ public class BlockManager {
       namesystem.writeUnlock();
     }
 
-    HashMap<Node, Node> excludedNodes
-        = new HashMap<Node, Node>();
+    final Set<Node> excludedNodes = new HashSet<Node>();
     for(ReplicationWork rw : work){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
       excludedNodes.clear();
       for (DatanodeDescriptor dn : rw.containingNodes) {
-        excludedNodes.put(dn, dn);
+        excludedNodes.add(dn);
       }
 
       // choose replication targets: NOT HOLDING THE GLOBAL LOCK
       // It is costly to extract the filename for which chooseTargets is called,
       // so for now we pass in the block collection itself.
-      rw.targets = blockplacement.chooseTarget(rw.bc,
-          rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
-          excludedNodes, rw.block.getNumBytes());
+      rw.chooseTargets(blockplacement, excludedNodes);
     }
 
     namesystem.writeLock();
@@ -1383,12 +1380,12 @@ public class BlockManager {
    * 
    * @throws IOException
    *           if the number of targets < minimum replication.
-   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
-   *      List, boolean, HashMap, long)
+   * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
+   *      List, boolean, Set, long)
    */
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
-      final HashMap<Node, Node> excludedNodes,
+      final Set<Node> excludedNodes,
       final long blocksize, List<String> favoredNodes) throws IOException {
     List<DatanodeDescriptor> favoredDatanodeDescriptors = 
         getDatanodeDescriptors(favoredNodes);
@@ -1788,6 +1785,14 @@ public class BlockManager {
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
             node, iblk, reportedState);
+        // OpenFileBlocks only inside snapshots also will be added to safemode
+        // threshold. So we need to update such blocks to safemode
+        // refer HDFS-5283
+        BlockInfoUnderConstruction blockUC = (BlockInfoUnderConstruction) storedBlock;
+        if (namesystem.isInSnapshot(blockUC)) {
+          int numOfReplicas = blockUC.getNumExpectedLocations();
+          namesystem.incrementSafeBlockCount(numOfReplicas);
+        }
         //and fall through to next clause
       }      
       //add replica if appropriate
@@ -3238,6 +3243,13 @@ assert storedBlock.findDatanode(dn) < 0 
       this.priority = priority;
       this.targets = null;
     }
+    
+    private void chooseTargets(BlockPlacementPolicy blockplacement,
+        Set<Node> excludedNodes) {
+      targets = blockplacement.chooseTarget(bc.getName(),
+          additionalReplRequired, srcNode, liveReplicaNodes, false,
+          excludedNodes, block.getNumBytes());
+    }
   }
 
   /**

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Wed Oct 30 22:21:59 2013
@@ -19,14 +19,15 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -55,25 +56,6 @@ public abstract class BlockPlacementPoli
    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
    * to re-replicate a block with size <i>blocksize</i> 
    * If not, return as many as we can.
-   * 
-   * @param srcPath the file to which this chooseTargets is being invoked. 
-   * @param numOfReplicas additional number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param chosenNodes datanodes that have been chosen as targets.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
-                                             int numOfReplicas,
-                                             DatanodeDescriptor writer,
-                                             List<DatanodeDescriptor> chosenNodes,
-                                             long blocksize);
-
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
-   * to re-replicate a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
    *
    * @param srcPath the file to which this chooseTargets is being invoked.
    * @param numOfReplicas additional number of replicas wanted.
@@ -87,48 +69,22 @@ public abstract class BlockPlacementPoli
    */
   public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
                                              int numOfReplicas,
-                                             DatanodeDescriptor writer,
+                                             Node writer,
                                              List<DatanodeDescriptor> chosenNodes,
                                              boolean returnChosenNodes,
-                                             HashMap<Node, Node> excludedNodes,
+                                             Set<Node> excludedNodes,
                                              long blocksize);
-
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
-   * If not, return as many as we can.
-   * The base implemenatation extracts the pathname of the file from the
-   * specified srcBC, but this could be a costly operation depending on the
-   * file system implementation. Concrete implementations of this class should
-   * override this method to avoid this overhead.
-   * 
-   * @param srcBC block collection of file for which chooseTarget is invoked.
-   * @param numOfReplicas additional number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param chosenNodes datanodes that have been chosen as targets.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(BlockCollection srcBC,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> chosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    return chooseTarget(srcBC.getName(), numOfReplicas, writer,
-                        chosenNodes, false, excludedNodes, blocksize);
-  }
   
   /**
-   * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, 
-   * HashMap, long)} with added parameter {@code favoredDatanodes}
+   * Same as {@link #chooseTarget(String, int, Node, List, boolean, 
+   * Set, long)} with added parameter {@code favoredDatanodes}
    * @param favoredNodes datanodes that should be favored as targets. This
    *          is only a hint and due to cluster state, namenode may not be 
    *          able to place the blocks on these datanodes.
    */
   DatanodeDescriptor[] chooseTarget(String src,
-      int numOfReplicas, DatanodeDescriptor writer,
-      HashMap<Node, Node> excludedNodes,
+      int numOfReplicas, Node writer,
+      Set<Node> excludedNodes,
       long blocksize, List<DatanodeDescriptor> favoredNodes) {
     // This class does not provide the functionality of placing
     // a block in favored datanodes. The implementations of this class
@@ -139,18 +95,17 @@ public abstract class BlockPlacementPoli
   }
 
   /**
-   * Verify that the block is replicated on at least minRacks different racks
-   * if there is more than minRacks rack in the system.
+   * Verify if the block's placement meets requirement of placement policy,
+   * i.e. replicas are placed on no less than minRacks racks in the system.
    * 
    * @param srcPath the full pathname of the file to be verified
    * @param lBlk block with locations
-   * @param minRacks number of racks the block should be replicated to
-   * @return the difference between the required and the actual number of racks
-   * the block is replicated to.
+   * @param numOfReplicas replica number of file to be verified
+   * @return the result of verification
    */
-  abstract public int verifyBlockPlacement(String srcPath,
-                                           LocatedBlock lBlk,
-                                           int minRacks);
+  abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath,
+      LocatedBlock lBlk,
+      int numOfReplicas);
   /**
    * Decide whether deleting the specified replica of the block still makes 
    * the block conform to the configured block placement policy.
@@ -183,7 +138,7 @@ public abstract class BlockPlacementPoli
     
   /**
    * Get an instance of the configured Block Placement Policy based on the
-   * value of the configuration paramater dfs.block.replicator.classname.
+   * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
    * 
    * @param conf the configuration to be used
    * @param stats an object that is used to retrieve the load on the cluster
@@ -193,12 +148,12 @@ public abstract class BlockPlacementPoli
   public static BlockPlacementPolicy getInstance(Configuration conf, 
                                                  FSClusterStats stats,
                                                  NetworkTopology clusterMap) {
-    Class<? extends BlockPlacementPolicy> replicatorClass =
-                      conf.getClass("dfs.block.replicator.classname",
-                                    BlockPlacementPolicyDefault.class,
-                                    BlockPlacementPolicy.class);
-    BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
-                                                             replicatorClass, conf);
+    final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
+        BlockPlacementPolicy.class);
+    final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
+        replicatorClass, conf);
     replicator.initialize(conf, stats, clusterMap);
     return replicator;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Oct 30 22:21:59 2013
@@ -21,8 +21,7 @@ import static org.apache.hadoop.util.Tim
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -55,7 +54,15 @@ public class BlockPlacementPolicyDefault
 
   private static final String enableDebugLogging =
     "For more information, please enable DEBUG log level on "
-    + LOG.getClass().getName();
+    + BlockPlacementPolicy.class.getName();
+
+  private static final ThreadLocal<StringBuilder> debugLoggingBuilder
+      = new ThreadLocal<StringBuilder>() {
+        @Override
+        protected StringBuilder initialValue() {
+          return new StringBuilder();
+        }
+      };
 
   protected boolean considerLoad; 
   private boolean preferLocalNode = true;
@@ -95,40 +102,25 @@ public class BlockPlacementPolicyDefault
         DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
-  protected ThreadLocal<StringBuilder> threadLocalBuilder =
-    new ThreadLocal<StringBuilder>() {
-    @Override
-    protected StringBuilder initialValue() {
-      return new StringBuilder();
-    }
-  };
-
-  @Override
-  public DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> chosenNodes,
-                                    long blocksize) {
-    return chooseTarget(numOfReplicas, writer, chosenNodes, false,
-        null, blocksize);
-  }
-
   @Override
   public DatanodeDescriptor[] chooseTarget(String srcPath,
                                     int numOfReplicas,
-                                    DatanodeDescriptor writer,
+                                    Node writer,
                                     List<DatanodeDescriptor> chosenNodes,
                                     boolean returnChosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
+                                    Set<Node> excludedNodes,
                                     long blocksize) {
     return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
         excludedNodes, blocksize);
   }
 
   @Override
-  DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
-      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
-      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+  DatanodeDescriptor[] chooseTarget(String src,
+      int numOfReplicas,
+      Node writer,
+      Set<Node> excludedNodes,
+      long blocksize,
+      List<DatanodeDescriptor> favoredNodes) {
     try {
       if (favoredNodes == null || favoredNodes.size() == 0) {
         // Favored nodes not specified, fall back to regular block placement.
@@ -137,8 +129,8 @@ public class BlockPlacementPolicyDefault
             excludedNodes, blocksize);
       }
 
-      HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
-          new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
+      Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
+          new HashSet<Node>() : new HashSet<Node>(excludedNodes);
 
       // Choose favored nodes
       List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
@@ -157,10 +149,10 @@ public class BlockPlacementPolicyDefault
               + " with favored node " + favoredNode); 
           continue;
         }
-        favoriteAndExcludedNodes.put(target, target);
+        favoriteAndExcludedNodes.add(target);
       }
 
-      if (results.size() < numOfReplicas) {        
+      if (results.size() < numOfReplicas) {
         // Not enough favored nodes, choose other nodes.
         numOfReplicas -= results.size();
         DatanodeDescriptor[] remainingTargets = 
@@ -181,18 +173,18 @@ public class BlockPlacementPolicyDefault
   }
 
   /** This is the implementation. */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
+  private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+                                    Node writer,
                                     List<DatanodeDescriptor> chosenNodes,
                                     boolean returnChosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
+                                    Set<Node> excludedNodes,
                                     long blocksize) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return new DatanodeDescriptor[0];
+      return DatanodeDescriptor.EMPTY_ARRAY;
     }
       
     if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
+      excludedNodes = new HashSet<Node>();
     }
      
     int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
@@ -204,16 +196,15 @@ public class BlockPlacementPolicyDefault
     for (DatanodeDescriptor node:chosenNodes) {
       // add localMachine and related nodes to excludedNodes
       addToExcludedNodes(node, excludedNodes);
-      adjustExcludedNodes(excludedNodes, node);
     }
       
     if (!clusterMap.contains(writer)) {
-      writer=null;
+      writer = null;
     }
       
     boolean avoidStaleNodes = (stats != null
         && stats.isAvoidingStaleDataNodesForWrite());
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+    Node localNode = chooseTarget(numOfReplicas, writer,
         excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (!returnChosenNodes) {  
       results.removeAll(chosenNodes);
@@ -236,10 +227,20 @@ public class BlockPlacementPolicyDefault
     return new int[] {numOfReplicas, maxNodesPerRack};
   }
     
-  /* choose <i>numOfReplicas</i> from all data nodes */
-  private DatanodeDescriptor chooseTarget(int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          HashMap<Node, Node> excludedNodes,
+  /**
+   * choose <i>numOfReplicas</i> from all data nodes
+   * @param numOfReplicas additional number of replicas wanted
+   * @param writer the writer's machine, could be a non-DatanodeDescriptor node
+   * @param excludedNodes datanodes that should not be considered as targets
+   * @param blocksize size of the data to be written
+   * @param maxNodesPerRack max nodes allowed per rack
+   * @param results the target nodes already chosen
+   * @param avoidStaleNodes avoid stale nodes in replica choosing
+   * @return local node of writer (not chosen node)
+   */
+  private Node chooseTarget(int numOfReplicas,
+                                          Node writer,
+                                          Set<Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
                                           List<DatanodeDescriptor> results,
@@ -251,13 +252,13 @@ public class BlockPlacementPolicyDefault
       
     int numOfResults = results.size();
     boolean newBlock = (numOfResults==0);
-    if (writer == null && !newBlock) {
+    if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
       writer = results.get(0);
     }
 
     // Keep a copy of original excludedNodes
-    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
-        new HashMap<Node, Node>(excludedNodes) : null;
+    final Set<Node> oldExcludedNodes = avoidStaleNodes ? 
+        new HashSet<Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
         writer = chooseLocalNode(writer, excludedNodes, blocksize,
@@ -304,7 +305,7 @@ public class BlockPlacementPolicyDefault
         // We need to additionally exclude the nodes that were added to the 
         // result list in the successful calls to choose*() above.
         for (Node node : results) {
-          oldExcludedNodes.put(node, node);
+          oldExcludedNodes.add(node);
         }
         // Set numOfReplicas, since it can get out of sync with the result list
         // if the NotEnoughReplicasException was thrown in chooseRandom().
@@ -316,33 +317,30 @@ public class BlockPlacementPolicyDefault
     return writer;
   }
     
-  /* choose <i>localMachine</i> as the target.
+  /**
+   * Choose <i>localMachine</i> as the target.
    * if <i>localMachine</i> is not available, 
    * choose a node on the same rack
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
+  protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+                                             Set<Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
                                              List<DatanodeDescriptor> results,
                                              boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+      throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
           maxNodesPerRack, results, avoidStaleNodes);
-    if (preferLocalNode) {
+    if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
+      DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
-      Node oldNode = excludedNodes.put(localMachine, localMachine);
-      if (oldNode == null) { // was not in the excluded list
-        if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
-            results, avoidStaleNodes)) {
-          results.add(localMachine);
-          // add localMachine and related nodes to excludedNode
-          addToExcludedNodes(localMachine, excludedNodes);
-          return localMachine;
+      if (excludedNodes.add(localMachine)) { // was not in the excluded list
+        if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
+            maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
+          return localDatanode;
         }
       } 
     }      
@@ -358,26 +356,25 @@ public class BlockPlacementPolicyDefault
    * @return number of new excluded nodes
    */
   protected int addToExcludedNodes(DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes) {
-    Node node = excludedNodes.put(localMachine, localMachine);
-    return node == null?1:0;
+      Set<Node> excludedNodes) {
+    return excludedNodes.add(localMachine) ? 1 : 0;
   }
 
-  /* choose one node from the rack that <i>localMachine</i> is on.
+  /**
+   * Choose one node from the rack that <i>localMachine</i> is on.
    * if no such node is available, choose one node from the rack where
    * a second replica is on.
    * if still no such node is available, choose a random node 
    * in the cluster.
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
+  protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+                                             Set<Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
                                              List<DatanodeDescriptor> results,
                                              boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
@@ -391,9 +388,7 @@ public class BlockPlacementPolicyDefault
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
+      for(DatanodeDescriptor nextNode : results) {
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -416,7 +411,8 @@ public class BlockPlacementPolicyDefault
     }
   }
     
-  /* choose <i>numOfReplicas</i> nodes from the racks 
+  /** 
+   * Choose <i>numOfReplicas</i> nodes from the racks 
    * that <i>localMachine</i> is NOT on.
    * if not enough nodes are available, choose the remaining ones 
    * from the local rack
@@ -424,12 +420,12 @@ public class BlockPlacementPolicyDefault
     
   protected void chooseRemoteRack(int numOfReplicas,
                                 DatanodeDescriptor localMachine,
-                                HashMap<Node, Node> excludedNodes,
+                                Set<Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
                                 List<DatanodeDescriptor> results,
                                 boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+                                    throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
@@ -443,91 +439,58 @@ public class BlockPlacementPolicyDefault
     }
   }
 
-  /* Randomly choose one target from <i>nodes</i>.
-   * @return the chosen node
+  /**
+   * Randomly choose one target from the given <i>scope</i>.
+   * @return the chosen node, if there is any.
    */
-  protected DatanodeDescriptor chooseRandom(
-                                          String nodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results,
-                                          boolean avoidStaleNodes) 
-    throws NotEnoughReplicasException {
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    StringBuilder builder = null;
-    if (LOG.isDebugEnabled()) {
-      builder = threadLocalBuilder.get();
-      builder.setLength(0);
-      builder.append("[");
-    }
-    boolean badTarget = false;
-    while(numOfAvailableNodes > 0) {
-      DatanodeDescriptor chosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
-      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
-      if (oldNode == null) { // chosenNode was not in the excluded list
-        numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, 
-                maxNodesPerRack, results, avoidStaleNodes)) {
-          results.add(chosenNode);
-          // add chosenNode and related nodes to excludedNode
-          addToExcludedNodes(chosenNode, excludedNodes);
-          adjustExcludedNodes(excludedNodes, chosenNode);
-          return chosenNode;
-        } else {
-          badTarget = true;
-        }
-      }
-    }
-
-    String detail = enableDebugLogging;
-    if (LOG.isDebugEnabled()) {
-      if (badTarget && builder != null) {
-        detail = builder.append("]").toString();
-        builder.setLength(0);
-      } else detail = "";
-    }
-    throw new NotEnoughReplicasException(detail);
+  protected DatanodeDescriptor chooseRandom(String scope,
+      Set<Node> excludedNodes,
+      long blocksize,
+      int maxNodesPerRack,
+      List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes)
+          throws NotEnoughReplicasException {
+    return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
+        results, avoidStaleNodes);
   }
-    
-  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+
+  /**
+   * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+   * @return the first chosen node, if there is any.
    */
-  protected void chooseRandom(int numOfReplicas,
-                            String nodes,
-                            HashMap<Node, Node> excludedNodes,
+  protected DatanodeDescriptor chooseRandom(int numOfReplicas,
+                            String scope,
+                            Set<Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
                             List<DatanodeDescriptor> results,
                             boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+                                throws NotEnoughReplicasException {
       
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
+        scope, excludedNodes);
     StringBuilder builder = null;
     if (LOG.isDebugEnabled()) {
-      builder = threadLocalBuilder.get();
+      builder = debugLoggingBuilder.get();
       builder.setLength(0);
       builder.append("[");
     }
     boolean badTarget = false;
+    DatanodeDescriptor firstChosen = null;
     while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
       DatanodeDescriptor chosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
-      if (oldNode == null) {
+          (DatanodeDescriptor)clusterMap.chooseRandom(scope);
+      if (excludedNodes.add(chosenNode)) { //was not in the excluded list
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, 
-              maxNodesPerRack, results, avoidStaleNodes)) {
+        int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
+            blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
+        if (newExcludedNodes >= 0) {
           numOfReplicas--;
-          results.add(chosenNode);
-          // add chosenNode and related nodes to excludedNode
-          int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
+          if (firstChosen == null) {
+            firstChosen = chosenNode;
+          }
           numOfAvailableNodes -= newExcludedNodes;
-          adjustExcludedNodes(excludedNodes, chosenNode);
         } else {
           badTarget = true;
         }
@@ -544,34 +507,44 @@ public class BlockPlacementPolicyDefault
       }
       throw new NotEnoughReplicasException(detail);
     }
+    
+    return firstChosen;
   }
-  
+
   /**
-   * After choosing a node to place replica, adjust excluded nodes accordingly.
-   * It should do nothing here as chosenNode is already put into exlcudeNodes, 
-   * but it can be overridden in subclass to put more related nodes into 
-   * excludedNodes.
-   * 
-   * @param excludedNodes
-   * @param chosenNode
-   */
-  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
-      Node chosenNode) {
-    // do nothing here.
+   * If the given node is a good target, add it to the result list and
+   * update the set of excluded nodes.
+   * @return -1 if the given is not a good target;
+   *         otherwise, return the number of nodes added to excludedNodes set.
+   */
+  int addIfIsGoodTarget(DatanodeDescriptor node,
+      Set<Node> excludedNodes,
+      long blockSize,
+      int maxNodesPerRack,
+      boolean considerLoad,
+      List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) {
+    if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
+        results, avoidStaleNodes)) {
+      results.add(node);
+      // add node and related nodes to excludedNode
+      return addToExcludedNodes(node, excludedNodes);
+    } else { 
+      return -1;
+    }
   }
 
-  /* judge if a node is a good target.
-   * return true if <i>node</i> has enough space, 
-   * does not have too much load, and the rack does not have too many nodes
-   */
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerRack,
-                               List<DatanodeDescriptor> results, 
-                               boolean avoidStaleNodes) {
-    return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
-        results, avoidStaleNodes);
+  private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
+    if (LOG.isDebugEnabled()) {
+      // build the error message for later use.
+      debugLoggingBuilder.get()
+          .append(node).append(": ")
+          .append("Node ").append(NodeBase.getPath(node))
+          .append(" is not chosen because ")
+          .append(reason);
+    }
   }
-  
+
   /**
    * Determine if a node is a good target. 
    * 
@@ -588,28 +561,20 @@ public class BlockPlacementPolicyDefault
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
    */
-  protected boolean isGoodTarget(DatanodeDescriptor node,
+  private boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
                                List<DatanodeDescriptor> results,                           
                                boolean avoidStaleNodes) {
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      if(LOG.isDebugEnabled()) {
-        threadLocalBuilder.get().append(node.toString()).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because the node is (being) decommissioned ");
-      }
+      logNodeIsNotChosen(node, "the node is (being) decommissioned ");
       return false;
     }
 
     if (avoidStaleNodes) {
       if (node.isStale(this.staleInterval)) {
-        if (LOG.isDebugEnabled()) {
-          threadLocalBuilder.get().append(node.toString()).append(": ")
-              .append("Node ").append(NodeBase.getPath(node))
-              .append(" is not chosen because the node is stale ");
-        }
+        logNodeIsNotChosen(node, "the node is stale ");
         return false;
       }
     }
@@ -618,11 +583,7 @@ public class BlockPlacementPolicyDefault
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine
     if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      if(LOG.isDebugEnabled()) {
-        threadLocalBuilder.get().append(node.toString()).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because the node does not have enough space ");
-      }
+      logNodeIsNotChosen(node, "the node does not have enough space ");
       return false;
     }
       
@@ -634,11 +595,7 @@ public class BlockPlacementPolicyDefault
         avgLoad = (double)stats.getTotalLoad()/size;
       }
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        if(LOG.isDebugEnabled()) {
-          threadLocalBuilder.get().append(node.toString()).append(": ")
-            .append("Node ").append(NodeBase.getPath(node))
-            .append(" is not chosen because the node is too busy ");
-        }
+        logNodeIsNotChosen(node, "the node is too busy ");
         return false;
       }
     }
@@ -646,31 +603,25 @@ public class BlockPlacementPolicyDefault
     // check if the target rack has chosen too many nodes
     String rackname = node.getNetworkLocation();
     int counter=1;
-    for(Iterator<DatanodeDescriptor> iter = results.iterator();
-        iter.hasNext();) {
-      Node result = iter.next();
+    for(Node result : results) {
       if (rackname.equals(result.getNetworkLocation())) {
         counter++;
       }
     }
     if (counter>maxTargetPerRack) {
-      if(LOG.isDebugEnabled()) {
-        threadLocalBuilder.get().append(node.toString()).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because the rack has too many chosen nodes ");
-      }
+      logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
       return false;
     }
     return true;
   }
     
-  /* Return a pipeline of nodes.
+  /**
+   * Return a pipeline of nodes.
    * The pipeline is formed finding a shortest path that 
    * starts from the writer and traverses all <i>nodes</i>
    * This is basically a traveling salesman problem.
    */
-  private DatanodeDescriptor[] getPipeline(
-                                           DatanodeDescriptor writer,
+  private DatanodeDescriptor[] getPipeline(Node writer,
                                            DatanodeDescriptor[] nodes) {
     if (nodes.length==0) return nodes;
       
@@ -704,44 +655,38 @@ public class BlockPlacementPolicyDefault
   }
 
   @Override
-  public int verifyBlockPlacement(String srcPath,
-                                  LocatedBlock lBlk,
-                                  int minRacks) {
+  public BlockPlacementStatus verifyBlockPlacement(String srcPath,
+      LocatedBlock lBlk, int numberOfReplicas) {
     DatanodeInfo[] locs = lBlk.getLocations();
     if (locs == null)
-      locs = new DatanodeInfo[0];
+      locs = DatanodeDescriptor.EMPTY_ARRAY;
     int numRacks = clusterMap.getNumOfRacks();
     if(numRacks <= 1) // only one rack
-      return 0;
-    minRacks = Math.min(minRacks, numRacks);
+      return new BlockPlacementStatusDefault(
+          Math.min(numRacks, numberOfReplicas), numRacks);
+    int minRacks = Math.min(2, numberOfReplicas);
     // 1. Check that all locations are different.
     // 2. Count locations on different racks.
     Set<String> racks = new TreeSet<String>();
     for (DatanodeInfo dn : locs)
       racks.add(dn.getNetworkLocation());
-    return minRacks - racks.size();
+    return new BlockPlacementStatusDefault(racks.size(), minRacks);
   }
 
   @Override
   public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
-                                                 Block block,
-                                                 short replicationFactor,
-                                                 Collection<DatanodeDescriptor> first, 
-                                                 Collection<DatanodeDescriptor> second) {
+      Block block, short replicationFactor,
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
     long oldestHeartbeat =
       now() - heartbeatInterval * tolerateHeartbeatMultiplier;
     DatanodeDescriptor oldestHeartbeatNode = null;
     long minSpace = Long.MAX_VALUE;
     DatanodeDescriptor minSpaceNode = null;
 
-    // pick replica from the first Set. If first is empty, then pick replicas
-    // from second set.
-    Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
-
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
-    while (iter.hasNext() ) {
-      DatanodeDescriptor node = iter.next();
+    for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
       long free = node.getRemaining();
       long lastHeartbeat = node.getLastUpdate();
       if(lastHeartbeat < oldestHeartbeat) {
@@ -762,12 +707,10 @@ public class BlockPlacementPolicyDefault
    * replica while second set contains remaining replica nodes.
    * So pick up first set if not empty. If first is empty, then pick second.
    */
-  protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+  protected Collection<DatanodeDescriptor> pickupReplicaSet(
       Collection<DatanodeDescriptor> first,
       Collection<DatanodeDescriptor> second) {
-    Iterator<DatanodeDescriptor> iter =
-        first.isEmpty() ? second.iterator() : first.iterator();
-    return iter;
+    return first.isEmpty() ? second : first;
   }
   
   @VisibleForTesting

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Wed Oct 30 22:21:59 2013
@@ -20,9 +20,9 @@ package org.apache.hadoop.hdfs.server.bl
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -63,31 +63,25 @@ public class BlockPlacementPolicyWithNod
    * @return the chosen node
    */
   @Override
-  protected DatanodeDescriptor chooseLocalNode(
-      DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes,
-      long blocksize,
-      int maxNodesPerRack,
-      List<DatanodeDescriptor> results,
-      boolean avoidStaleNodes)
+  protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
         throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
           blocksize, maxNodesPerRack, results, avoidStaleNodes);
 
-    // otherwise try local machine first
-    Node oldNode = excludedNodes.put(localMachine, localMachine);
-    if (oldNode == null) { // was not in the excluded list
-      if (isGoodTarget(localMachine, blocksize,
-          maxNodesPerRack, false, results, avoidStaleNodes)) {
-        results.add(localMachine);
-        // Nodes under same nodegroup should be excluded.
-        addNodeGroupToExcludedNodes(excludedNodes,
-            localMachine.getNetworkLocation());
-        return localMachine;
+    if (localMachine instanceof DatanodeDescriptor) {
+      DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
+      // otherwise try local machine first
+      if (excludedNodes.add(localMachine)) { // was not in the excluded list
+        if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
+            maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
+          return localDataNode;
+        }
       }
-    } 
+    }
 
     // try a node on local node group
     DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
@@ -105,34 +99,10 @@ public class BlockPlacementPolicyWithNod
    * {@inheritDoc}
    */
   @Override
-  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
-      Node chosenNode) {
-    // as node-group aware implementation, it should make sure no two replica
-    // are placing on the same node group.
-    addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
-  }
-  
-  // add all nodes under specific nodegroup to excludedNodes.
-  private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
-      String nodeGroup) {
-    List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
-    for (Node node : leafNodes) {
-      excludedNodes.put(node, node);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results,
-                                             boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+  protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
@@ -148,9 +118,7 @@ public class BlockPlacementPolicyWithNod
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
+      for(DatanodeDescriptor nextNode : results) {
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -181,13 +149,9 @@ public class BlockPlacementPolicyWithNod
    */
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
-          DatanodeDescriptor localMachine,
-          HashMap<Node, Node> excludedNodes,
-          long blocksize,
-          int maxReplicasPerRack,
-          List<DatanodeDescriptor> results,
-          boolean avoidStaleNodes)
-          throws NotEnoughReplicasException {
+      DatanodeDescriptor localMachine, Set<Node> excludedNodes,
+      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
 
     final String rackLocation = NetworkTopology.getFirstHalf(
@@ -210,10 +174,11 @@ public class BlockPlacementPolicyWithNod
    * if still no such node is available, choose a random node in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
-      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize, 
-      int maxNodesPerRack, List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-          throws NotEnoughReplicasException {
+  private DatanodeDescriptor chooseLocalNodeGroup(
+      NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
@@ -227,9 +192,7 @@ public class BlockPlacementPolicyWithNod
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-        iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
+      for(DatanodeDescriptor nextNode : results) {
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -264,14 +227,14 @@ public class BlockPlacementPolicyWithNod
    * within the same nodegroup
    * @return number of new excluded nodes
    */
-  protected int addToExcludedNodes(DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes) {
+  @Override
+  protected int addToExcludedNodes(DatanodeDescriptor chosenNode,
+      Set<Node> excludedNodes) {
     int countOfExcludedNodes = 0;
-    String nodeGroupScope = localMachine.getNetworkLocation();
+    String nodeGroupScope = chosenNode.getNetworkLocation();
     List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
     for (Node leafNode : leafNodes) {
-      Node node = excludedNodes.put(leafNode, leafNode);
-      if (node == null) {
+      if (excludedNodes.add(leafNode)) {
         // not a existing node in excludedNodes
         countOfExcludedNodes++;
       }
@@ -290,12 +253,12 @@ public class BlockPlacementPolicyWithNod
    * If first is empty, then pick second.
    */
   @Override
-  public Iterator<DatanodeDescriptor> pickupReplicaSet(
+  public Collection<DatanodeDescriptor> pickupReplicaSet(
       Collection<DatanodeDescriptor> first,
       Collection<DatanodeDescriptor> second) {
     // If no replica within same rack, return directly.
     if (first.isEmpty()) {
-      return second.iterator();
+      return second;
     }
     // Split data nodes in the first set into two sets, 
     // moreThanOne contains nodes on nodegroup with more than one replica
@@ -328,9 +291,7 @@ public class BlockPlacementPolicyWithNod
       }
     }
     
-    Iterator<DatanodeDescriptor> iter =
-        moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
-    return iter;
+    return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
   }
   
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Wed Oct 30 22:21:59 2013
@@ -57,11 +57,11 @@ class BlocksMap {
   /** Constant {@link LightWeightGSet} capacity. */
   private final int capacity;
   
-  private volatile GSet<Block, BlockInfo> blocks;
+  private GSet<Block, BlockInfo> blocks;
 
-  BlocksMap(final float loadFactor) {
+  BlocksMap(int capacity) {
     // Use 2% of total memory to size the GSet capacity
-    this.capacity = LightWeightGSet.computeCapacity(2.0, "BlocksMap");
+    this.capacity = capacity;
     this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Oct 30 22:21:59 2013
@@ -42,7 +42,8 @@ import org.apache.hadoop.util.Time;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DatanodeDescriptor extends DatanodeInfo {
-  
+  public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
+
   // Stores status of decommissioning.
   // If node is not decommissioning, do not use this object for anything.
   public DecommissioningStatus decommissioningStatus = new DecommissioningStatus();

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Oct 30 22:21:59 2013
@@ -17,21 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.util.Time.now;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.net.InetAddresses;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,13 +29,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
@@ -55,32 +38,23 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import static org.apache.hadoop.util.Time.now;
 
 /**
  * Manage datanodes, include decommission and other activities.
@@ -127,6 +101,8 @@ public class DatanodeManager {
   
   private final int defaultInfoPort;
 
+  private final int defaultInfoSecurePort;
+
   private final int defaultIpcPort;
 
   /** Read include/exclude files*/
@@ -166,6 +142,7 @@ public class DatanodeManager {
    */
   private boolean hasClusterEverBeenMultiRack = false;
 
+  private final boolean checkIpHostnameInRegistration;
   /**
    * The number of datanodes for each software version. This list should change
    * during rolling upgrades.
@@ -188,7 +165,10 @@ public class DatanodeManager {
               DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
     this.defaultInfoPort = NetUtils.createSocketAddr(
           conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
-              DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
+              DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT)).getPort();
+    this.defaultInfoSecurePort = NetUtils.createSocketAddr(
+        conf.get(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY,
+            DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
     this.defaultIpcPort = NetUtils.createSocketAddr(
           conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
               DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
@@ -231,6 +211,12 @@ public class DatanodeManager {
     LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
         + "=" + this.blockInvalidateLimit);
 
+    this.checkIpHostnameInRegistration = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
+        DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
+    LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY
+        + "=" + checkIpHostnameInRegistration);
+
     this.avoidStaleDataNodesForRead = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
@@ -753,11 +739,13 @@ public class DatanodeManager {
       // Mostly called inside an RPC, update ip and peer hostname
       String hostname = dnAddress.getHostName();
       String ip = dnAddress.getHostAddress();
-      if (!isNameResolved(dnAddress)) {
+      if (checkIpHostnameInRegistration && !isNameResolved(dnAddress)) {
         // Reject registration of unresolved datanode to prevent performance
         // impact of repetitive DNS lookups later.
-        LOG.warn("Unresolved datanode registration from " + ip);
-        throw new DisallowedDatanodeException(nodeReg);
+        final String message = "hostname cannot be resolved (ip="
+            + ip + ", hostname=" + hostname + ")";
+        LOG.warn("Unresolved datanode registration: " + message);
+        throw new DisallowedDatanodeException(nodeReg, message);
       }
       // update node registration with the ip and hostname from rpc request
       nodeReg.setIpAddr(ip);
@@ -886,7 +874,12 @@ public class DatanodeManager {
       // If the network location is invalid, clear the cached mappings
       // so that we have a chance to re-add this DataNode with the
       // correct network location later.
-      dnsToSwitchMapping.reloadCachedMappings();
+      List<String> invalidNodeNames = new ArrayList<String>(3);
+      // clear cache for nodes in IP or Hostname
+      invalidNodeNames.add(nodeReg.getIpAddr());
+      invalidNodeNames.add(nodeReg.getHostName());
+      invalidNodeNames.add(nodeReg.getPeerHostName());
+      dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
       throw e;
     }
   }
@@ -1122,6 +1115,7 @@ public class DatanodeManager {
       // The IP:port is sufficient for listing in a report
       dnId = new DatanodeID(hostStr, "", "", port,
           DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+          DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
           DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
     } else {
       String ipAddr = "";
@@ -1132,6 +1126,7 @@ public class DatanodeManager {
       }
       dnId = new DatanodeID(ipAddr, hostStr, "", port,
           DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+          DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
           DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
     }
     return dnId;
@@ -1179,7 +1174,7 @@ public class DatanodeManager {
               new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(),
                   entry.getPrefix(), "",
                   entry.getPort() == 0 ? defaultXferPort : entry.getPort(),
-                  defaultInfoPort, defaultIpcPort));
+                  defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
           dn.setLastUpdate(0); // Consider this node dead for reporting
           nodes.add(dn);
         }
@@ -1198,17 +1193,17 @@ public class DatanodeManager {
   /**
    * Checks if name resolution was successful for the given address.  If IP
    * address and host name are the same, then it means name resolution has
-   * failed.  As a special case, the loopback address is also considered
+   * failed.  As a special case, local addresses are also considered
    * acceptable.  This is particularly important on Windows, where 127.0.0.1 does
    * not resolve to "localhost".
    *
    * @param address InetAddress to check
-   * @return boolean true if name resolution successful or address is loopback
+   * @return boolean true if name resolution successful or address is local
    */
   private static boolean isNameResolved(InetAddress address) {
     String hostname = address.getHostName();
     String ip = address.getHostAddress();
-    return !hostname.equals(ip) || address.isLoopbackAddress();
+    return !hostname.equals(ip) || NetUtils.isLocalAddress(address);
   }
   
   private void setDatanodeDead(DatanodeDescriptor node) {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Oct 30 22:21:59 2013
@@ -18,24 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.common;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeSet;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.jsp.JspWriter;
+import com.google.common.base.Charsets;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,13 +30,9 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -74,10 +53,22 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
-import com.google.common.base.Charsets;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.jsp.JspWriter;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.*;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
 
 @InterfaceAudience.Private
 public class JspHelper {
@@ -112,7 +103,7 @@ public class JspHelper {
       return super.hashCode();
     }
   }
- 
+
   // compare two records based on their frequency
   private static class NodeRecordComparator implements Comparator<NodeRecord> {
 
@@ -126,6 +117,27 @@ public class JspHelper {
       return 0;
     }
   }
+
+  /**
+   * A helper class that generates the correct URL for different schema.
+   *
+   */
+  public static final class Url {
+    public static String authority(String scheme, DatanodeID d) {
+      if (scheme.equals("http")) {
+        return d.getInfoAddr();
+      } else if (scheme.equals("https")) {
+        return d.getInfoSecureAddr();
+      } else {
+        throw new IllegalArgumentException("Unknown scheme:" + scheme);
+      }
+    }
+
+    public static String url(String scheme, DatanodeID d) {
+      return scheme + "://" + authority(scheme, d);
+    }
+  }
+
   public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf)
       throws IOException {
     HashMap<DatanodeInfo, NodeRecord> map =
@@ -217,7 +229,7 @@ public class JspHelper {
         offsetIntoBlock, amtToRead,  true,
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         new DatanodeID(addr.getAddress().getHostAddress(),
-            addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
+            addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null,
             null, null, false, CachingStrategy.newDefaultStrategy());
         
     final byte[] buf = new byte[amtToRead];

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Wed Oct 30 22:21:59 2013
@@ -100,6 +100,7 @@ class BlockPoolSliceScanner {
   private long currentPeriodStart = Time.now();
   private long bytesLeft = 0; // Bytes to scan in this period
   private long totalBytesToScan = 0;
+  private boolean isNewPeriod = true;
   
   private final LogFileHandler verificationLog;
   
@@ -126,7 +127,10 @@ class BlockPoolSliceScanner {
       public int compare(BlockScanInfo left, BlockScanInfo right) {
         final long l = left.lastScanTime;
         final long r = right.lastScanTime;
-        return l < r? -1: l > r? 1: 0; 
+        // compare blocks itself if scantimes are same to avoid.
+        // because TreeMap uses comparator if available to check existence of
+        // the object. 
+        return l < r? -1: l > r? 1: left.compareTo(right); 
       }
     };
 
@@ -148,8 +152,6 @@ class BlockPoolSliceScanner {
     public boolean equals(Object that) {
       if (this == that) {
         return true;
-      } else if (that == null || !(that instanceof BlockScanInfo)) {
-        return false;
       }
       return super.equals(that);
     }
@@ -539,10 +541,12 @@ class BlockPoolSliceScanner {
                   entry.genStamp));
               if (info != null) {
                 if (processedBlocks.get(entry.blockId) == null) {
-                  updateBytesLeft(-info.getNumBytes());
+                  if (isNewPeriod) {
+                    updateBytesLeft(-info.getNumBytes());
+                  }
                   processedBlocks.put(entry.blockId, 1);
                 }
-                if (logIterator.isPrevious()) {
+                if (logIterator.isLastReadFromPrevious()) {
                   // write the log entry to current file
                   // so that the entry is preserved for later runs.
                   verificationLog.append(entry.verificationTime, entry.genStamp,
@@ -557,6 +561,7 @@ class BlockPoolSliceScanner {
       } finally {
         IOUtils.closeStream(logIterator);
       }
+      isNewPeriod = false;
     }
     
     
@@ -597,6 +602,7 @@ class BlockPoolSliceScanner {
     // reset the byte counts :
     bytesLeft = totalBytesToScan;
     currentPeriodStart = Time.now();
+    isNewPeriod = true;
   }
   
   private synchronized boolean workRemainingInCurrentPeriod() {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Oct 30 22:21:59 2013
@@ -18,66 +18,10 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -95,38 +39,15 @@ import org.apache.hadoop.hdfs.HDFSPolicy
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.*;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.protocolPB.*;
+import org.apache.hadoop.hdfs.security.token.block.*;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -141,11 +62,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
@@ -166,24 +83,21 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.JvmPauseMonitor;
-import org.apache.hadoop.util.ServicePlugin;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
+import java.io.*;
+import java.net.*;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -265,6 +179,7 @@ public class DataNode extends Configured
   private volatile boolean heartbeatsDisabledForTests = false;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
+  private int infoSecurePort;
   DataNodeMetrics metrics;
   private InetSocketAddress streamingAddr;
   
@@ -389,16 +304,13 @@ public class DataNode extends Configured
     InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
     String infoHost = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
-    this.infoServer = (secureResources == null) 
-        ? new HttpServer.Builder().setName("datanode")
-            .setBindAddress(infoHost).setPort(tmpInfoPort)
-            .setFindPort(tmpInfoPort == 0).setConf(conf)
-            .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).build()
-        : new HttpServer.Builder().setName("datanode")
-            .setBindAddress(infoHost).setPort(tmpInfoPort)
-            .setFindPort(tmpInfoPort == 0).setConf(conf)
-            .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
-            .setConnector(secureResources.getListener()).build();
+    HttpServer.Builder builder = new HttpServer.Builder().setName("datanode")
+        .setBindAddress(infoHost).setPort(tmpInfoPort)
+        .setFindPort(tmpInfoPort == 0).setConf(conf)
+        .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")));
+    this.infoServer = (secureResources == null) ? builder.build() :
+        builder.setConnector(secureResources.getListener()).build();
+
     LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
     if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
       boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
@@ -412,6 +324,7 @@ public class DataNode extends Configured
       if(LOG.isDebugEnabled()) {
         LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
       }
+      infoSecurePort = secInfoSocAddr.getPort();
     }
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
     this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
@@ -780,7 +693,8 @@ public class DataNode extends Configured
     }
     DatanodeID dnId = new DatanodeID(
         streamingAddr.getAddress().getHostAddress(), hostName, 
-        getStorageId(), getXferPort(), getInfoPort(), getIpcPort());
+        getStorageId(), getXferPort(), getInfoPort(),
+            infoSecurePort, getIpcPort());
     return new DatanodeRegistration(dnId, storageInfo, 
         new ExportedBlockKeys(), VersionInfo.getVersion());
   }
@@ -878,7 +792,7 @@ public class DataNode extends Configured
    * If this is the first block pool to register, this also initializes
    * the datanode-scoped storage.
    * 
-   * @param nsInfo the handshake response from the NN.
+   * @param bpos Block pool offer service
    * @throws IOException if the NN is inconsistent with the local storage.
    */
   void initBlockPool(BPOfferService bpos) throws IOException {
@@ -1403,15 +1317,13 @@ public class DataNode extends Configured
     
     int numTargets = xferTargets.length;
     if (numTargets > 0) {
-      if (LOG.isInfoEnabled()) {
-        StringBuilder xfersBuilder = new StringBuilder();
-        for (int i = 0; i < numTargets; i++) {
-          xfersBuilder.append(xferTargets[i]);
-          xfersBuilder.append(" ");
-        }
-        LOG.info(bpReg + " Starting thread to transfer " + 
-                 block + " to " + xfersBuilder);                       
+      StringBuilder xfersBuilder = new StringBuilder();
+      for (int i = 0; i < numTargets; i++) {
+        xfersBuilder.append(xferTargets[i]);
+        xfersBuilder.append(" ");
       }
+      LOG.info(bpReg + " Starting thread to transfer " + 
+               block + " to " + xfersBuilder);                       
 
       new Daemon(new DataTransfer(xferTargets, block,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
@@ -2336,6 +2248,13 @@ public class DataNode extends Configured
   }
 
   /**
+   * @return the datanode's https port
+   */
+  public int getInfoSecurePort() {
+    return infoSecurePort;
+  }
+
+  /**
    * Returned information is a JSON representation of a map with 
    * name node host name as the key and block pool Id as the value.
    * Note that, if there are multiple NNs in an NA nameservice,



Mime
View raw message