hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1550774 [3/10] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apach...
Date Fri, 13 Dec 2013 17:28:18 GMT
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Dec 13 17:28:14 2013
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -44,6 +45,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
@@ -70,8 +72,10 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -470,8 +474,8 @@ public class BlockManager {
   private void dumpBlockMeta(Block block, PrintWriter out) {
     List<DatanodeDescriptor> containingNodes =
                                       new ArrayList<DatanodeDescriptor>();
-    List<DatanodeDescriptor> containingLiveReplicasNodes =
-      new ArrayList<DatanodeDescriptor>();
+    List<DatanodeStorageInfo> containingLiveReplicasNodes =
+      new ArrayList<DatanodeStorageInfo>();
     
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
@@ -498,9 +502,8 @@ public class BlockManager {
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
     
-    for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
-         jt.hasNext();) {
-      DatanodeDescriptor node = jt.next();
+    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       String state = "";
       if (corruptNodes != null && corruptNodes.contains(node)) {
         state = "(corrupt)";
@@ -509,7 +512,7 @@ public class BlockManager {
         state = "(decommissioned)";
       }
       
-      if (node.areBlockContentsStale()) {
+      if (storage.areBlockContentsStale()) {
         state += " (block deletions maybe out of date)";
       }
       out.print(" " + node + state + " : ");
@@ -660,10 +663,9 @@ public class BlockManager {
     assert oldBlock == getStoredBlock(oldBlock) :
       "last block of the file is not in blocksMap";
 
-    DatanodeDescriptor[] targets = getNodes(oldBlock);
+    DatanodeStorageInfo[] targets = getStorages(oldBlock);
 
-    BlockInfoUnderConstruction ucBlock =
-      bc.setLastBlock(oldBlock, targets);
+    BlockInfoUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets);
     blocksMap.replaceBlock(ucBlock);
 
     // Remove block from replication queue.
@@ -673,9 +675,8 @@ public class BlockManager {
     pendingReplications.remove(ucBlock);
 
     // remove this block from the list of pending blocks to be deleted. 
-    for (DatanodeDescriptor dd : targets) {
-      String datanodeId = dd.getStorageID();
-      invalidateBlocks.remove(datanodeId, oldBlock);
+    for (DatanodeStorageInfo storage : targets) {
+      invalidateBlocks.remove(storage.getStorageID(), oldBlock);
     }
     
     // Adjust safe-mode totals, since under-construction blocks don't
@@ -694,18 +695,17 @@ public class BlockManager {
   /**
    * Get all valid locations of the block
    */
-  private List<String> getValidLocations(Block block) {
-    ArrayList<String> machineSet =
-      new ArrayList<String>(blocksMap.numNodes(block));
-    for(Iterator<DatanodeDescriptor> it =
-      blocksMap.nodeIterator(block); it.hasNext();) {
-      String storageID = it.next().getStorageID();
+  private List<DatanodeStorageInfo> getValidLocations(Block block) {
+    final List<DatanodeStorageInfo> locations
+        = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+      final String storageID = storage.getStorageID();
       // filter invalidate replicas
       if(!invalidateBlocks.contains(storageID, block)) {
-        machineSet.add(storageID);
+        locations.add(storage);
       }
     }
-    return machineSet;
+    return locations;
   }
   
   private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
@@ -773,9 +773,9 @@ public class BlockManager {
             + ", blk=" + blk);
       }
       final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
-      final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+      final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
       final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
-      return new LocatedBlock(eb, locations, pos, false);
+      return new LocatedBlock(eb, storages, pos, false);
     }
 
     // get block locations
@@ -790,15 +790,14 @@ public class BlockManager {
     final int numNodes = blocksMap.numNodes(blk);
     final boolean isCorrupt = numCorruptNodes == numNodes;
     final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
-    final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
     int j = 0;
     if (numMachines > 0) {
-      for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
-          it.hasNext();) {
-        final DatanodeDescriptor d = it.next();
+      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
+        final DatanodeDescriptor d = storage.getDatanodeDescriptor();
         final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
         if (isCorrupt || (!isCorrupt && !replicaCorrupt))
-          machines[j++] = d;
+          machines[j++] = storage;
       }
     }
     assert j == machines.length :
@@ -990,13 +989,20 @@ public class BlockManager {
     }
 
     node.resetBlocks();
-    invalidateBlocks.remove(node.getStorageID());
+    invalidateBlocks.remove(node.getDatanodeUuid());
     
     // If the DN hasn't block-reported since the most recent
     // failover, then we may have been holding up on processing
     // over-replicated blocks because of it. But we can now
     // process those blocks.
-    if (node.areBlockContentsStale()) {
+    boolean stale = false;
+    for(DatanodeStorageInfo storage : node.getStorageInfos()) {
+      if (storage.areBlockContentsStale()) {
+        stale = true;
+        break;
+      }
+    }
+    if (stale) {
       rescanPostponedMisreplicatedBlocks();
     }
   }
@@ -1015,9 +1021,8 @@ public class BlockManager {
    */
   private void addToInvalidates(Block b) {
     StringBuilder datanodes = new StringBuilder();
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
-        .hasNext();) {
-      DatanodeDescriptor node = it.next();
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       invalidateBlocks.add(b, node, false);
       datanodes.append(node).append(" ");
     }
@@ -1035,7 +1040,7 @@ public class BlockManager {
    * for logging purposes
    */
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
-      final DatanodeInfo dn, String reason) throws IOException {
+      final DatanodeInfo dn, String storageID, String reason) throws IOException {
     assert namesystem.hasWriteLock();
     final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
     if (storedBlock == null) {
@@ -1048,11 +1053,11 @@ public class BlockManager {
       return;
     }
     markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
-        Reason.CORRUPTION_REPORTED), dn);
+        Reason.CORRUPTION_REPORTED), dn, storageID);
   }
 
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
-                                  DatanodeInfo dn) throws IOException {
+      DatanodeInfo dn, String storageID) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
       throw new IOException("Cannot mark " + b
@@ -1068,7 +1073,7 @@ public class BlockManager {
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(b.stored);
+    node.addBlock(storageID, b.stored);
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@@ -1193,7 +1198,7 @@ public class BlockManager {
   @VisibleForTesting
   int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
     int requiredReplication, numEffectiveReplicas;
-    List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
+    List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
     BlockCollection bc = null;
     int additionalReplRequired;
@@ -1218,7 +1223,7 @@ public class BlockManager {
 
             // get a source data-node
             containingNodes = new ArrayList<DatanodeDescriptor>();
-            liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
+            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
                 block, containingNodes, liveReplicaNodes, numReplicas,
@@ -1277,7 +1282,7 @@ public class BlockManager {
     namesystem.writeLock();
     try {
       for(ReplicationWork rw : work){
-        DatanodeDescriptor[] targets = rw.targets;
+        final DatanodeStorageInfo[] targets = rw.targets;
         if(targets == null || targets.length == 0){
           rw.targets = null;
           continue;
@@ -1315,7 +1320,8 @@ public class BlockManager {
 
           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
                (!blockHasEnoughRacks(block)) ) {
-            if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+            if (rw.srcNode.getNetworkLocation().equals(
+                targets[0].getDatanodeDescriptor().getNetworkLocation())) {
               //No use continuing, unless a new rack in this case
               continue;
             }
@@ -1324,15 +1330,13 @@ public class BlockManager {
           // Add block to the to be replicated list
           rw.srcNode.addBlockToBeReplicated(block, targets);
           scheduledWork++;
-
-          for (DatanodeDescriptor dn : targets) {
-            dn.incBlocksScheduled();
-          }
+          DatanodeStorageInfo.incrementBlocksScheduled(targets);
 
           // Move the block-replication into a "pending" state.
           // The reason we use 'pending' is so we can retry
           // replications that fail after an appropriate amount of time.
-          pendingReplications.increment(block, targets);
+          pendingReplications.increment(block,
+              DatanodeStorageInfo.toDatanodeDescriptors(targets));
           if(blockLog.isDebugEnabled()) {
             blockLog.debug(
                 "BLOCK* block " + block
@@ -1352,12 +1356,12 @@ public class BlockManager {
     if (blockLog.isInfoEnabled()) {
       // log which blocks have been scheduled for replication
       for(ReplicationWork rw : work){
-        DatanodeDescriptor[] targets = rw.targets;
+        DatanodeStorageInfo[] targets = rw.targets;
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
           for (int k = 0; k < targets.length; k++) {
             targetList.append(' ');
-            targetList.append(targets[k]);
+            targetList.append(targets[k].getDatanodeDescriptor());
           }
           blockLog.info("BLOCK* ask " + rw.srcNode
               + " to replicate " + rw.block + " to " + targetList);
@@ -1381,15 +1385,16 @@ public class BlockManager {
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
    *      List, boolean, Set, long)
    */
-  public DatanodeDescriptor[] chooseTarget(final String src,
+  public DatanodeStorageInfo[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
       final Set<Node> excludedNodes,
       final long blocksize, List<String> favoredNodes) throws IOException {
     List<DatanodeDescriptor> favoredDatanodeDescriptors = 
         getDatanodeDescriptors(favoredNodes);
-    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
+    final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
         numOfReplicas, client, excludedNodes, blocksize, 
-        favoredDatanodeDescriptors);
+        // TODO: get storage type from file
+        favoredDatanodeDescriptors, StorageType.DEFAULT);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -1450,12 +1455,11 @@ public class BlockManager {
    *         the given block
    */
    @VisibleForTesting
-   DatanodeDescriptor chooseSourceDatanode(
-                                    Block block,
-                                    List<DatanodeDescriptor> containingNodes,
-                                    List<DatanodeDescriptor> nodesContainingLiveReplicas,
-                                    NumberReplicas numReplicas,
-                                    int priority) {
+   DatanodeDescriptor chooseSourceDatanode(Block block,
+       List<DatanodeDescriptor> containingNodes,
+       List<DatanodeStorageInfo>  nodesContainingLiveReplicas,
+       NumberReplicas numReplicas,
+       int priority) {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     DatanodeDescriptor srcNode = null;
@@ -1463,12 +1467,12 @@ public class BlockManager {
     int decommissioned = 0;
     int corrupt = 0;
     int excess = 0;
-    Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+    
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
-    while(it.hasNext()) {
-      DatanodeDescriptor node = it.next();
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       LightWeightLinkedSet<Block> excessBlocks =
-        excessReplicateMap.get(node.getStorageID());
+        excessReplicateMap.get(node.getDatanodeUuid());
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt++;
       else if (node.isDecommissionInProgress() || node.isDecommissioned())
@@ -1476,7 +1480,7 @@ public class BlockManager {
       else if (excessBlocks != null && excessBlocks.contains(block)) {
         excess++;
       } else {
-        nodesContainingLiveReplicas.add(node);
+        nodesContainingLiveReplicas.add(storage);
         live++;
       }
       containingNodes.add(node);
@@ -1608,10 +1612,11 @@ public class BlockManager {
   }
 
   /**
-   * The given datanode is reporting all its blocks.
-   * Update the (machine-->blocklist) and (block-->machinelist) maps.
+   * The given storage is reporting all its blocks.
+   * Update the (storage-->block list) and (block-->storage list) maps.
    */
-  public void processReport(final DatanodeID nodeID, final String poolId,
+  public void processReport(final DatanodeID nodeID,
+      final DatanodeStorage storage, final String poolId,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
@@ -1625,26 +1630,28 @@ public class BlockManager {
 
       // To minimize startup time, we discard any second (or later) block reports
       // that we receive while still in startup phase.
-      if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
+      final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
+      if (namesystem.isInStartupSafeMode()
+          && storageInfo.getBlockReportCount() > 0) {
         blockLog.info("BLOCK* processReport: "
             + "discarded non-initial block report from " + nodeID
             + " because namenode still in startup phase");
         return;
       }
 
-      if (node.numBlocks() == 0) {
+      if (storageInfo.numBlocks() == 0) {
         // The first block report can be processed a lot more efficiently than
         // ordinary block reports.  This shortens restart times.
-        processFirstBlockReport(node, newReport);
+        processFirstBlockReport(node, storage.getStorageID(), newReport);
       } else {
-        processReport(node, newReport);
+        processReport(node, storage, newReport);
       }
       
       // Now that we have an up-to-date block report, we know that any
       // deletions from a previous NN iteration have been accounted for.
-      boolean staleBefore = node.areBlockContentsStale();
-      node.receivedBlockReport();
-      if (staleBefore && !node.areBlockContentsStale()) {
+      boolean staleBefore = storageInfo.areBlockContentsStale();
+      storageInfo.receivedBlockReport();
+      if (staleBefore && !storageInfo.areBlockContentsStale()) {
         LOG.info("BLOCK* processReport: Received first block report from "
             + node + " after starting up or becoming active. Its block "
             + "contents are no longer considered stale");
@@ -1698,28 +1705,30 @@ public class BlockManager {
   }
   
   private void processReport(final DatanodeDescriptor node,
+      final DatanodeStorage storage,
       final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toRemove = new LinkedList<Block>();
+    Collection<Block> toRemove = new TreeSet<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+    reportDiff(node, storage, report,
+        toAdd, toRemove, toInvalidate, toCorrupt, toUC);
 
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node);
+      addStoredBlockUnderConstruction(b, node, storage.getStorageID());
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1733,7 +1742,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
+      markBlockAsCorrupt(b, node, storage.getStorageID());
     }
   }
 
@@ -1749,10 +1758,11 @@ public class BlockManager {
    * @throws IOException 
    */
   private void processFirstBlockReport(final DatanodeDescriptor node,
+      final String storageID,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
-    assert (node.numBlocks() == 0);
+    assert (node.getStorageInfo(storageID).numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
     while(itBR.hasNext()) {
@@ -1761,7 +1771,7 @@ public class BlockManager {
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
-        queueReportedBlock(node, iblk, reportedState,
+        queueReportedBlock(node, storageID, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
@@ -1778,10 +1788,10 @@ public class BlockManager {
         if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
-          queueReportedBlock(node, iblk, reportedState,
+          queueReportedBlock(node, storageID, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c, node);
+          markBlockAsCorrupt(c, node, storageID);
         }
         continue;
       }
@@ -1789,7 +1799,7 @@ public class BlockManager {
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            node, iblk, reportedState);
+            node.getStorageInfo(storageID), iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
@@ -1802,22 +1812,25 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, node);
+        addStoredBlockImmediate(storedBlock, node, storageID);
       }
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, 
+  private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+
+    final DatanodeStorageInfo storageInfo = dn.updateStorage(storage);
+
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
-    boolean added = dn.addBlock(delimiter);
+    boolean added = storageInfo.addBlock(delimiter);
     assert added : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
     int curIndex;
@@ -1829,20 +1842,21 @@ public class BlockManager {
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
-                                  toAdd, toInvalidate, toCorrupt, toUC);
+      BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+          iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
+
       // move block to the head of the list
       if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
-        headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
+        headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
       }
     }
+
     // collect blocks that have not been reported
     // all of them are next to the delimiter
-    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
-        delimiter.getNext(0), dn);
+    Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
     while(it.hasNext())
       toRemove.add(it.next());
-    dn.removeBlock(delimiter);
+    storageInfo.removeBlock(delimiter);
   }
 
   /**
@@ -1876,7 +1890,8 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+  private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
+      final String storageID,
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
@@ -1891,7 +1906,7 @@ public class BlockManager {
   
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(dn, block, reportedState,
+      queueReportedBlock(dn, storageID, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
@@ -1912,7 +1927,7 @@ public class BlockManager {
     }
 
     // Ignore replicas already scheduled to be removed from the DN
-    if(invalidateBlocks.contains(dn.getStorageID(), block)) {
+    if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) {
 /*  TODO: following assertion is incorrect, see HDFS-2668
 assert storedBlock.findDatanode(dn) < 0 : "Block " + block
         + " in recentInvalidatesSet should not appear in DN " + dn; */
@@ -1926,7 +1941,7 @@ assert storedBlock.findDatanode(dn) < 0 
         // If the block is an out-of-date generation stamp or state,
         // but we're the standby, we shouldn't treat it as corrupt,
         // but instead just queue it for later processing.
-        queueReportedBlock(dn, storedBlock, reportedState,
+        queueReportedBlock(dn, storageID, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
@@ -1955,7 +1970,7 @@ assert storedBlock.findDatanode(dn) < 0 
    * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
-  private void queueReportedBlock(DatanodeDescriptor dn, Block block,
+  private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
       ReplicaState reportedState, String reason) {
     assert shouldPostponeBlocksFromFuture;
     
@@ -1965,7 +1980,7 @@ assert storedBlock.findDatanode(dn) < 0 
           " from datanode " + dn + " for later processing " +
           "because " + reason + ".");
     }
-    pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
+    pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
   }
 
   /**
@@ -1988,8 +2003,8 @@ assert storedBlock.findDatanode(dn) < 0 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing previouly queued message " + rbi);
       }
-      processAndHandleReportedBlock(
-          rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
+      processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), 
+          rbi.getBlock(), rbi.getReportedState(), null);
     }
   }
   
@@ -2106,19 +2121,21 @@ assert storedBlock.findDatanode(dn) < 0 
       return false;
     }
   }
-  
+
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
-      DatanodeDescriptor node) throws IOException {
+      DatanodeDescriptor node, String storageID) throws IOException {
     BlockInfoUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(node, ucBlock.reportedBlock, ucBlock.reportedState);
+    block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
+        ucBlock.reportedBlock, ucBlock.reportedState);
+
     if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
-      addStoredBlock(block, node, null, true);
+      addStoredBlock(block, node, storageID, null, true);
     }
-  }
-  
+  } 
+
   /**
    * Faster version of
-   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
+   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
    * , intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
@@ -2129,17 +2146,17 @@ assert storedBlock.findDatanode(dn) < 0 
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
-                               DatanodeDescriptor node)
+      DatanodeDescriptor node, String storageID)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, node, null, false);
+      addStoredBlock(storedBlock, node, storageID, null, false);
       return;
     }
 
     // just add it
-    node.addBlock(storedBlock);
+    node.addBlock(storageID, storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2162,6 +2179,7 @@ assert storedBlock.findDatanode(dn) < 0 
    */
   private Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
+                               String storageID,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
@@ -2187,7 +2205,7 @@ assert storedBlock.findDatanode(dn) < 0 
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = node.addBlock(storedBlock);
+    boolean added = node.addBlock(storageID, storedBlock);
 
     int curReplicaDelta;
     if (added) {
@@ -2447,19 +2465,19 @@ assert storedBlock.findDatanode(dn) < 0 
     Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(block);
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
-      if (cur.areBlockContentsStale()) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+      if (storage.areBlockContentsStale()) {
         LOG.info("BLOCK* processOverReplicatedBlock: " +
             "Postponing processing of over-replicated " +
-            block + " since datanode " + cur + " does not yet have up-to-date " +
+            block + " since storage + " + storage
+            + "datanode " + cur + " does not yet have up-to-date " +
             "block information.");
         postponeBlock(block);
         return;
       }
       LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
-          .getStorageID());
+          .getDatanodeUuid());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
           // exclude corrupt replicas
@@ -2548,10 +2566,10 @@ assert storedBlock.findDatanode(dn) < 0 
 
   private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
-    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
+    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
     if (excessBlocks == null) {
       excessBlocks = new LightWeightLinkedSet<Block>();
-      excessReplicateMap.put(dn.getStorageID(), excessBlocks);
+      excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
     }
     if (excessBlocks.add(block)) {
       excessBlocksCount.incrementAndGet();
@@ -2599,7 +2617,7 @@ assert storedBlock.findDatanode(dn) < 0 
       // in "excess" there.
       //
       LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
-          .getStorageID());
+          .getDatanodeUuid());
       if (excessBlocks != null) {
         if (excessBlocks.remove(block)) {
           excessBlocksCount.decrementAndGet();
@@ -2608,7 +2626,7 @@ assert storedBlock.findDatanode(dn) < 0 
                 + block + " is removed from excessBlocks");
           }
           if (excessBlocks.size() == 0) {
-            excessReplicateMap.remove(node.getStorageID());
+            excessReplicateMap.remove(node.getDatanodeUuid());
           }
         }
       }
@@ -2623,12 +2641,18 @@ assert storedBlock.findDatanode(dn) < 0 
    * return the length of the added block; 0 if the block is not added
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
-    final List<String> machineSet = getValidLocations(block);
-    if(machineSet.size() == 0) {
+    final List<DatanodeStorageInfo> locations = getValidLocations(block);
+    if(locations.size() == 0) {
       return 0;
     } else {
-      results.add(new BlockWithLocations(block, 
-          machineSet.toArray(new String[machineSet.size()])));
+      final String[] datanodeUuids = new String[locations.size()];
+      final String[] storageIDs = new String[datanodeUuids.length];
+      for(int i = 0; i < locations.size(); i++) {
+        final DatanodeStorageInfo s = locations.get(i);
+        datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
+        storageIDs[i] = s.getStorageID();
+      }
+      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
       return block.getNumBytes();
     }
   }
@@ -2637,12 +2661,12 @@ assert storedBlock.findDatanode(dn) < 0 
    * The given node is reporting that it received a certain block.
    */
   @VisibleForTesting
-  void addBlock(DatanodeDescriptor node, Block block, String delHint)
+  void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
       throws IOException {
-    // decrement number of blocks scheduled to this datanode.
+    // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
-    node.decBlocksScheduled();
+    node.decrementBlocksScheduled();
 
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;
@@ -2658,11 +2682,12 @@ assert storedBlock.findDatanode(dn) < 0 
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.decrement(block, node);
-    processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
+    processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
         delHintNode);
   }
   
-  private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
+  private void processAndHandleReportedBlock(DatanodeDescriptor node,
+      String storageID, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
@@ -2670,7 +2695,7 @@ assert storedBlock.findDatanode(dn) < 0 
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, block, reportedState,
+    processReportedBlock(node, storageID, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2678,11 +2703,11 @@ assert storedBlock.findDatanode(dn) < 0 
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node);
+      addStoredBlockUnderConstruction(b, node, storageID);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2696,7 +2721,7 @@ assert storedBlock.findDatanode(dn) < 0 
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
+      markBlockAsCorrupt(b, node, storageID);
     }
   }
 
@@ -2708,7 +2733,7 @@ assert storedBlock.findDatanode(dn) < 0 
    * This method must be called with FSNamesystem lock held.
    */
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+      final String poolId, final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     assert namesystem.hasWriteLock();
     int received = 0;
@@ -2724,19 +2749,19 @@ assert storedBlock.findDatanode(dn) < 0 
           "Got incremental block report from unregistered or dead node");
     }
 
-    for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+    for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
       switch (rdbi.getStatus()) {
       case DELETED_BLOCK:
         removeStoredBlock(rdbi.getBlock(), node);
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, rdbi.getBlock(),
+        processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(),
             ReplicaState.RBW, null);
         break;
       default:
@@ -2768,24 +2793,23 @@ assert storedBlock.findDatanode(dn) < 0 
     int corrupt = 0;
     int excess = 0;
     int stale = 0;
-    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
-    while (nodeIter.hasNext()) {
-      DatanodeDescriptor node = nodeIter.next();
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
         corrupt++;
       } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         decommissioned++;
       } else {
         LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
-            .getStorageID());
+            .getDatanodeUuid());
         if (blocksExcess != null && blocksExcess.contains(b)) {
           excess++;
         } else {
           live++;
         }
       }
-      if (node.areBlockContentsStale()) {
+      if (storage.areBlockContentsStale()) {
         stale++;
       }
     }
@@ -2808,10 +2832,9 @@ assert storedBlock.findDatanode(dn) < 0 
     }
     // else proceed with fast case
     int live = 0;
-    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
-    while (nodeIter.hasNext()) {
-      DatanodeDescriptor node = nodeIter.next();
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
         live++;
     }
@@ -2823,10 +2846,9 @@ assert storedBlock.findDatanode(dn) < 0 
     int curReplicas = num.liveReplicas();
     int curExpectedReplicas = getReplication(block);
     BlockCollection bc = blocksMap.getBlockCollection(block);
-    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
     StringBuilder nodeList = new StringBuilder();
-    while (nodeIter.hasNext()) {
-      DatanodeDescriptor node = nodeIter.next();
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       nodeList.append(node);
       nodeList.append(" ");
     }
@@ -2923,14 +2945,13 @@ assert storedBlock.findDatanode(dn) < 0 
     return blocksMap.size();
   }
 
-  public DatanodeDescriptor[] getNodes(BlockInfo block) {
-    DatanodeDescriptor[] nodes =
-      new DatanodeDescriptor[block.numNodes()];
-    Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-    for (int i = 0; it != null && it.hasNext(); i++) {
-      nodes[i] = it.next();
+  public DatanodeStorageInfo[] getStorages(BlockInfo block) {
+    final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
+    int i = 0;
+    for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
+      storages[i++] = s;
     }
-    return nodes;
+    return storages;
   }
 
   public int getTotalBlocks() {
@@ -3059,9 +3080,8 @@ assert storedBlock.findDatanode(dn) < 0 
                                   corruptReplicas.getNodes(b);
     int numExpectedReplicas = getReplication(b);
     String rackName = null;
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
         if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
           if (numExpectedReplicas == 1 ||
@@ -3105,8 +3125,8 @@ assert storedBlock.findDatanode(dn) < 0 
   }
 
   /** @return an iterator of the datanodes. */
-  public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
-    return blocksMap.nodeIterator(block);
+  public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
+    return blocksMap.getStorages(block);
   }
 
   public int numCorruptReplicas(Block block) {
@@ -3257,24 +3277,24 @@ assert storedBlock.findDatanode(dn) < 0 
 
     private DatanodeDescriptor srcNode;
     private List<DatanodeDescriptor> containingNodes;
-    private List<DatanodeDescriptor> liveReplicaNodes;
+    private List<DatanodeStorageInfo> liveReplicaStorages;
     private int additionalReplRequired;
 
-    private DatanodeDescriptor targets[];
+    private DatanodeStorageInfo targets[];
     private int priority;
 
     public ReplicationWork(Block block,
         BlockCollection bc,
         DatanodeDescriptor srcNode,
         List<DatanodeDescriptor> containingNodes,
-        List<DatanodeDescriptor> liveReplicaNodes,
+        List<DatanodeStorageInfo> liveReplicaStorages,
         int additionalReplRequired,
         int priority) {
       this.block = block;
       this.bc = bc;
       this.srcNode = srcNode;
       this.containingNodes = containingNodes;
-      this.liveReplicaNodes = liveReplicaNodes;
+      this.liveReplicaStorages = liveReplicaStorages;
       this.additionalReplRequired = additionalReplRequired;
       this.priority = priority;
       this.targets = null;
@@ -3283,8 +3303,8 @@ assert storedBlock.findDatanode(dn) < 0 
     private void chooseTargets(BlockPlacementPolicy blockplacement,
         Set<Node> excludedNodes) {
       targets = blockplacement.chooseTarget(bc.getName(),
-          additionalReplRequired, srcNode, liveReplicaNodes, false,
-          excludedNodes, block.getNumBytes());
+          additionalReplRequired, srcNode, liveReplicaStorages, false,
+          excludedNodes, block.getNumBytes(), StorageType.DEFAULT);
     }
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Dec 13 17:28:14 2013
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -67,13 +68,14 @@ public abstract class BlockPlacementPoli
    * @return array of DatanodeDescriptor instances chosen as target
    * and sorted as a pipeline.
    */
-  public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
                                              int numOfReplicas,
                                              Node writer,
-                                             List<DatanodeDescriptor> chosenNodes,
+                                             List<DatanodeStorageInfo> chosen,
                                              boolean returnChosenNodes,
                                              Set<Node> excludedNodes,
-                                             long blocksize);
+                                             long blocksize,
+                                             StorageType storageType);
   
   /**
    * Same as {@link #chooseTarget(String, int, Node, List, boolean, 
@@ -82,16 +84,19 @@ public abstract class BlockPlacementPoli
    *          is only a hint and due to cluster state, namenode may not be 
    *          able to place the blocks on these datanodes.
    */
-  DatanodeDescriptor[] chooseTarget(String src,
+  DatanodeStorageInfo[] chooseTarget(String src,
       int numOfReplicas, Node writer,
       Set<Node> excludedNodes,
-      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+      long blocksize,
+      List<DatanodeDescriptor> favoredNodes,
+      StorageType storageType) {
     // This class does not provide the functionality of placing
     // a block in favored datanodes. The implementations of this class
     // are expected to provide this functionality
+
     return chooseTarget(src, numOfReplicas, writer, 
-        new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes, 
-        blocksize);
+        new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
+        excludedNodes, blocksize, storageType);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Dec 13 17:28:14 2013
@@ -29,11 +29,14 @@ import java.util.TreeSet;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
@@ -103,99 +106,101 @@ public class BlockPlacementPolicyDefault
   }
 
   @Override
-  public DatanodeDescriptor[] chooseTarget(String srcPath,
+  public DatanodeStorageInfo[] chooseTarget(String srcPath,
                                     int numOfReplicas,
                                     Node writer,
-                                    List<DatanodeDescriptor> chosenNodes,
+                                    List<DatanodeStorageInfo> chosenNodes,
                                     boolean returnChosenNodes,
                                     Set<Node> excludedNodes,
-                                    long blocksize) {
+                                    long blocksize,
+                                    StorageType storageType) {
     return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
-        excludedNodes, blocksize);
+        excludedNodes, blocksize, storageType);
   }
 
   @Override
-  DatanodeDescriptor[] chooseTarget(String src,
+  DatanodeStorageInfo[] chooseTarget(String src,
       int numOfReplicas,
       Node writer,
       Set<Node> excludedNodes,
       long blocksize,
-      List<DatanodeDescriptor> favoredNodes) {
+      List<DatanodeDescriptor> favoredNodes,
+      StorageType storageType) {
     try {
       if (favoredNodes == null || favoredNodes.size() == 0) {
         // Favored nodes not specified, fall back to regular block placement.
         return chooseTarget(src, numOfReplicas, writer,
-            new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
-            excludedNodes, blocksize);
+            new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
+            excludedNodes, blocksize, storageType);
       }
 
       Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
           new HashSet<Node>() : new HashSet<Node>(excludedNodes);
 
       // Choose favored nodes
-      List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
+      List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
       boolean avoidStaleNodes = stats != null
           && stats.isAvoidingStaleDataNodesForWrite();
       for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
         DatanodeDescriptor favoredNode = favoredNodes.get(i);
         // Choose a single node which is local to favoredNode.
         // 'results' is updated within chooseLocalNode
-        DatanodeDescriptor target = chooseLocalNode(favoredNode,
+        final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
             favoriteAndExcludedNodes, blocksize, 
-            getMaxNodesPerRack(results, 
-                numOfReplicas)[1], results, avoidStaleNodes);
+            getMaxNodesPerRack(results.size(), numOfReplicas)[1],
+            results, avoidStaleNodes, storageType);
         if (target == null) {
           LOG.warn("Could not find a target for file " + src
               + " with favored node " + favoredNode); 
           continue;
         }
-        favoriteAndExcludedNodes.add(target);
+        favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
       }
 
       if (results.size() < numOfReplicas) {
         // Not enough favored nodes, choose other nodes.
         numOfReplicas -= results.size();
-        DatanodeDescriptor[] remainingTargets = 
+        DatanodeStorageInfo[] remainingTargets = 
             chooseTarget(src, numOfReplicas, writer, results,
-                false, favoriteAndExcludedNodes, blocksize);
+                false, favoriteAndExcludedNodes, blocksize, storageType);
         for (int i = 0; i < remainingTargets.length; i++) {
           results.add(remainingTargets[i]);
         }
       }
       return getPipeline(writer,
-          results.toArray(new DatanodeDescriptor[results.size()]));
+          results.toArray(new DatanodeStorageInfo[results.size()]));
     } catch (NotEnoughReplicasException nr) {
       // Fall back to regular block placement disregarding favored nodes hint
       return chooseTarget(src, numOfReplicas, writer, 
-          new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
-          excludedNodes, blocksize);
+          new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
+          excludedNodes, blocksize, storageType);
     }
   }
 
   /** This is the implementation. */
-  private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
                                     Node writer,
-                                    List<DatanodeDescriptor> chosenNodes,
+                                    List<DatanodeStorageInfo> chosenStorage,
                                     boolean returnChosenNodes,
                                     Set<Node> excludedNodes,
-                                    long blocksize) {
+                                    long blocksize,
+                                    StorageType storageType) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return DatanodeDescriptor.EMPTY_ARRAY;
+      return DatanodeStorageInfo.EMPTY_ARRAY;
     }
       
     if (excludedNodes == null) {
       excludedNodes = new HashSet<Node>();
     }
      
-    int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
+    int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
     numOfReplicas = result[0];
     int maxNodesPerRack = result[1];
       
-    List<DatanodeDescriptor> results = 
-      new ArrayList<DatanodeDescriptor>(chosenNodes);
-    for (DatanodeDescriptor node:chosenNodes) {
+    final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
+    for (DatanodeStorageInfo storage : chosenStorage) {
       // add localMachine and related nodes to excludedNodes
-      addToExcludedNodes(node, excludedNodes);
+      addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
     }
       
     if (!clusterMap.contains(writer)) {
@@ -205,20 +210,19 @@ public class BlockPlacementPolicyDefault
     boolean avoidStaleNodes = (stats != null
         && stats.isAvoidingStaleDataNodesForWrite());
     Node localNode = chooseTarget(numOfReplicas, writer,
-        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
     if (!returnChosenNodes) {  
-      results.removeAll(chosenNodes);
+      results.removeAll(chosenStorage);
     }
       
     // sorting nodes to form a pipeline
     return getPipeline((writer==null)?localNode:writer,
-                       results.toArray(new DatanodeDescriptor[results.size()]));
+                       results.toArray(new DatanodeStorageInfo[results.size()]));
   }
 
-  private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
-      int numOfReplicas) {
+  private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
     int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    int totalNumOfReplicas = numOfChosen + numOfReplicas;
     if (totalNumOfReplicas > clusterSize) {
       numOfReplicas -= (totalNumOfReplicas-clusterSize);
       totalNumOfReplicas = clusterSize;
@@ -243,8 +247,9 @@ public class BlockPlacementPolicyDefault
                                           Set<Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results,
-                                          final boolean avoidStaleNodes) {
+                                          List<DatanodeStorageInfo> results,
+                                          final boolean avoidStaleNodes,
+                                          StorageType storageType) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
@@ -253,7 +258,7 @@ public class BlockPlacementPolicyDefault
     int numOfResults = results.size();
     boolean newBlock = (numOfResults==0);
     if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
-      writer = results.get(0);
+      writer = results.get(0).getDatanodeDescriptor();
     }
 
     // Keep a copy of original excludedNodes
@@ -261,42 +266,49 @@ public class BlockPlacementPolicyDefault
         new HashSet<Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+        writer = chooseLocalStorage(writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes, storageType)
+                .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
+      final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
       if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+        chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
+            results, avoidStaleNodes, storageType);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 2) {
-        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-          chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, 
-                           results, avoidStaleNodes);
+        final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
+        if (clusterMap.isOnSameRack(dn0, dn1)) {
+          chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
+              results, avoidStaleNodes, storageType);
         } else if (newBlock){
-          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results, avoidStaleNodes);
+          chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
+              results, avoidStaleNodes, storageType);
         } else {
           chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes);
+              results, avoidStaleNodes, storageType);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes);
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e) {
-      LOG.warn("Not able to place enough replicas, still in need of "
-               + (totalReplicasExpected - results.size()) + " to reach "
-               + totalReplicasExpected + "\n"
-               + e.getMessage());
+      final String message = "Failed to place enough replicas, still in need of "
+          + (totalReplicasExpected - results.size()) + " to reach "
+          + totalReplicasExpected + ".";
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(message, e);
+      } else {
+        LOG.warn(message + " " + e.getMessage());
+      }
+
       if (avoidStaleNodes) {
         // Retry chooseTarget again, this time not avoiding stale nodes.
 
@@ -304,14 +316,14 @@ public class BlockPlacementPolicyDefault
         // not chosen because they were stale, decommissioned, etc.
         // We need to additionally exclude the nodes that were added to the 
         // result list in the successful calls to choose*() above.
-        for (Node node : results) {
-          oldExcludedNodes.add(node);
+        for (DatanodeStorageInfo resultStorage : results) {
+          oldExcludedNodes.add(resultStorage.getDatanodeDescriptor());
         }
         // Set numOfReplicas, since it can get out of sync with the result list
         // if the NotEnoughReplicasException was thrown in chooseRandom().
         numOfReplicas = totalReplicasExpected - results.size();
         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
-            maxNodesPerRack, results, false);
+            maxNodesPerRack, results, false, storageType);
       }
     }
     return writer;
@@ -321,32 +333,36 @@ public class BlockPlacementPolicyDefault
    * Choose <i>localMachine</i> as the target.
    * if <i>localMachine</i> is not available, 
    * choose a node on the same rack
-   * @return the chosen node
+   * @return the chosen storage
    */
-  protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
                                              Set<Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results,
-                                             boolean avoidStaleNodes)
+                                             List<DatanodeStorageInfo> results,
+                                             boolean avoidStaleNodes,
+                                             StorageType storageType)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes);
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
-            maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
-          return localDatanode;
+        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+            localDatanode.getStorageInfos())) {
+          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+              maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
+            return localStorage;
+          }
         }
       } 
     }      
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, blocksize,
-        maxNodesPerRack, results, avoidStaleNodes);
+        maxNodesPerRack, results, avoidStaleNodes, storageType);
   }
   
   /**
@@ -368,27 +384,29 @@ public class BlockPlacementPolicyDefault
    * in the cluster.
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+  protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
                                              Set<Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results,
-                                             boolean avoidStaleNodes)
+                                             List<DatanodeStorageInfo> results,
+                                             boolean avoidStaleNodes,
+                                             StorageType storageType)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes);
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     }
       
     // choose one from the local rack
     try {
       return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
-          blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(DatanodeDescriptor nextNode : results) {
+      for(DatanodeStorageInfo resultStorage : results) {
+        DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -397,16 +415,16 @@ public class BlockPlacementPolicyDefault
       if (newLocal != null) {
         try {
           return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes);
+              blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results, avoidStaleNodes, storageType);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results, avoidStaleNodes, storageType);
       }
     }
   }
@@ -423,48 +441,51 @@ public class BlockPlacementPolicyDefault
                                 Set<Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results,
-                                boolean avoidStaleNodes)
+                                List<DatanodeStorageInfo> results,
+                                boolean avoidStaleNodes,
+                                StorageType storageType)
                                     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
       chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
           excludedNodes, blocksize, maxReplicasPerRack, results,
-          avoidStaleNodes);
+          avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results, avoidStaleNodes);
+                   maxReplicasPerRack, results, avoidStaleNodes, storageType);
     }
   }
 
   /**
    * Randomly choose one target from the given <i>scope</i>.
-   * @return the chosen node, if there is any.
+   * @return the chosen storage, if there is any.
    */
-  protected DatanodeDescriptor chooseRandom(String scope,
+  protected DatanodeStorageInfo chooseRandom(String scope,
       Set<Node> excludedNodes,
       long blocksize,
       int maxNodesPerRack,
-      List<DatanodeDescriptor> results,
-      boolean avoidStaleNodes)
+      List<DatanodeStorageInfo> results,
+      boolean avoidStaleNodes,
+      StorageType storageType)
           throws NotEnoughReplicasException {
     return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
-        results, avoidStaleNodes);
+        results, avoidStaleNodes, storageType);
   }
 
   /**
    * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
    * @return the first chosen node, if there is any.
    */
-  protected DatanodeDescriptor chooseRandom(int numOfReplicas,
+  protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
                             String scope,
                             Set<Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
-                            List<DatanodeDescriptor> results,
-                            boolean avoidStaleNodes)
+                            List<DatanodeStorageInfo> results,
+                            boolean avoidStaleNodes,
+                            StorageType storageType)
                                 throws NotEnoughReplicasException {
       
     int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
@@ -476,24 +497,32 @@ public class BlockPlacementPolicyDefault
       builder.append("[");
     }
     boolean badTarget = false;
-    DatanodeDescriptor firstChosen = null;
+    DatanodeStorageInfo firstChosen = null;
     while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
       DatanodeDescriptor chosenNode = 
           (DatanodeDescriptor)clusterMap.chooseRandom(scope);
       if (excludedNodes.add(chosenNode)) { //was not in the excluded list
         numOfAvailableNodes--;
 
-        int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
-            blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
-        if (newExcludedNodes >= 0) {
-          numOfReplicas--;
-          if (firstChosen == null) {
-            firstChosen = chosenNode;
+        final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
+            chosenNode.getStorageInfos());
+        int i;
+        for(i = 0; i < storages.length; i++) {
+          final int newExcludedNodes = addIfIsGoodTarget(storages[i],
+              excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
+              avoidStaleNodes, storageType);
+          if (newExcludedNodes >= 0) {
+            numOfReplicas--;
+            if (firstChosen == null) {
+              firstChosen = storages[i];
+            }
+            numOfAvailableNodes -= newExcludedNodes;
+            break;
           }
-          numOfAvailableNodes -= newExcludedNodes;
-        } else {
-          badTarget = true;
         }
+
+        // If no candidate storage was found on this DN then set badTarget.
+        badTarget = (i == storages.length);
       }
     }
       
@@ -512,43 +541,46 @@ public class BlockPlacementPolicyDefault
   }
 
   /**
-   * If the given node is a good target, add it to the result list and
+   * If the given storage is a good target, add it to the result list and
    * update the set of excluded nodes.
    * @return -1 if the given is not a good target;
    *         otherwise, return the number of nodes added to excludedNodes set.
    */
-  int addIfIsGoodTarget(DatanodeDescriptor node,
+  int addIfIsGoodTarget(DatanodeStorageInfo storage,
       Set<Node> excludedNodes,
       long blockSize,
       int maxNodesPerRack,
       boolean considerLoad,
-      List<DatanodeDescriptor> results,
-      boolean avoidStaleNodes) {
-    if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
-        results, avoidStaleNodes)) {
-      results.add(node);
+      List<DatanodeStorageInfo> results,                           
+      boolean avoidStaleNodes,
+      StorageType storageType) {
+    if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad,
+        results, avoidStaleNodes, storageType)) {
+      results.add(storage);
       // add node and related nodes to excludedNode
-      return addToExcludedNodes(node, excludedNodes);
+      return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
     } else { 
       return -1;
     }
   }
 
-  private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
+  private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
     if (LOG.isDebugEnabled()) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       // build the error message for later use.
       debugLoggingBuilder.get()
           .append(node).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
+          .append("Storage ").append(storage)
+          .append("at node ").append(NodeBase.getPath(node))
           .append(" is not chosen because ")
           .append(reason);
     }
   }
 
   /**
-   * Determine if a node is a good target. 
+   * Determine if a storage is a good target. 
    * 
-   * @param node The target node
+   * @param storage The target storage
    * @param blockSize Size of block
    * @param maxTargetPerRack Maximum number of targets per rack. The value of 
    *                       this parameter depends on the number of racks in 
@@ -561,32 +593,43 @@ public class BlockPlacementPolicyDefault
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
    */
-  private boolean isGoodTarget(DatanodeDescriptor node,
+  private boolean isGoodTarget(DatanodeStorageInfo storage,
                                long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
-                               List<DatanodeDescriptor> results,                           
-                               boolean avoidStaleNodes) {
-    // check if the node is (being) decommissed
+                               List<DatanodeStorageInfo> results,
+                               boolean avoidStaleNodes,
+                               StorageType storageType) {
+    if (storage.getStorageType() != storageType) {
+      logNodeIsNotChosen(storage,
+          "storage types do not match, where the expected storage type is "
+              + storageType);
+      return false;
+    }
+    if (storage.getState() == State.READ_ONLY) {
+      logNodeIsNotChosen(storage, "storage is read-only");
+      return false;
+    }
+    DatanodeDescriptor node = storage.getDatanodeDescriptor();
+    // check if the node is (being) decommissioned
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logNodeIsNotChosen(node, "the node is (being) decommissioned ");
+      logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
       return false;
     }
 
     if (avoidStaleNodes) {
       if (node.isStale(this.staleInterval)) {
-        logNodeIsNotChosen(node, "the node is stale ");
+        logNodeIsNotChosen(storage, "the node is stale ");
         return false;
       }
     }
     
-    long remaining = node.getRemaining() - 
-                     (node.getBlocksScheduled() * blockSize); 
-    // check the remaining capacity of the target machine
-    if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      logNodeIsNotChosen(node, "the node does not have enough space ");
+    final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
+    final long scheduledSize = blockSize * node.getBlocksScheduled();
+    if (requiredSize > node.getRemaining() - scheduledSize) {
+      logNodeIsNotChosen(storage, "the node does not have enough space ");
       return false;
     }
-      
+
     // check the communication traffic of the target machine
     if (considerLoad) {
       double avgLoad = 0;
@@ -595,7 +638,7 @@ public class BlockPlacementPolicyDefault
         avgLoad = (double)stats.getTotalLoad()/size;
       }
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logNodeIsNotChosen(node, "the node is too busy ");
+        logNodeIsNotChosen(storage, "the node is too busy ");
         return false;
       }
     }
@@ -603,13 +646,14 @@ public class BlockPlacementPolicyDefault
     // check if the target rack has chosen too many nodes
     String rackname = node.getNetworkLocation();
     int counter=1;
-    for(Node result : results) {
-      if (rackname.equals(result.getNetworkLocation())) {
+    for(DatanodeStorageInfo resultStorage : results) {
+      if (rackname.equals(
+          resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
         counter++;
       }
     }
     if (counter>maxTargetPerRack) {
-      logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
+      logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
       return false;
     }
     return true;
@@ -621,37 +665,40 @@ public class BlockPlacementPolicyDefault
    * starts from the writer and traverses all <i>nodes</i>
    * This is basically a traveling salesman problem.
    */
-  private DatanodeDescriptor[] getPipeline(Node writer,
-                                           DatanodeDescriptor[] nodes) {
-    if (nodes.length==0) return nodes;
-      
+  private DatanodeStorageInfo[] getPipeline(Node writer,
+      DatanodeStorageInfo[] storages) {
+    if (storages.length == 0) {
+      return storages;
+    }
+
     synchronized(clusterMap) {
       int index=0;
       if (writer == null || !clusterMap.contains(writer)) {
-        writer = nodes[0];
+        writer = storages[0].getDatanodeDescriptor();
       }
-      for(;index<nodes.length; index++) {
-        DatanodeDescriptor shortestNode = nodes[index];
-        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+      for(; index < storages.length; index++) {
+        DatanodeStorageInfo shortestStorage = storages[index];
+        int shortestDistance = clusterMap.getDistance(writer,
+            shortestStorage.getDatanodeDescriptor());
         int shortestIndex = index;
-        for(int i=index+1; i<nodes.length; i++) {
-          DatanodeDescriptor currentNode = nodes[i];
-          int currentDistance = clusterMap.getDistance(writer, currentNode);
+        for(int i = index + 1; i < storages.length; i++) {
+          int currentDistance = clusterMap.getDistance(writer,
+              storages[i].getDatanodeDescriptor());
           if (shortestDistance>currentDistance) {
             shortestDistance = currentDistance;
-            shortestNode = currentNode;
+            shortestStorage = storages[i];
             shortestIndex = i;
           }
         }
         //switch position index & shortestIndex
         if (index != shortestIndex) {
-          nodes[shortestIndex] = nodes[index];
-          nodes[index] = shortestNode;
+          storages[shortestIndex] = storages[index];
+          storages[index] = shortestStorage;
         }
-        writer = shortestNode;
+        writer = shortestStorage.getDatanodeDescriptor();
       }
     }
-    return nodes;
+    return storages;
   }
 
   @Override



Mime
View raw message