hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1615844 [3/5] - in /hadoop/common/branches/fs-encryption/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/src/main/j...
Date Tue, 05 Aug 2014 02:31:00 GMT
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug  5 02:30:54 2014
@@ -1082,6 +1082,7 @@ public class BlockManager {
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
+   * @param storageID if known, null otherwise.
    * @param reason a textual reason why the block should be marked corrupt,
    * for logging purposes
    */
@@ -1098,19 +1099,29 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
-        blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
-        dn, storageID);
-  }
 
-  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
-      DatanodeInfo dn, String storageID) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot mark " + b
+      throw new IOException("Cannot mark " + blk
           + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
           + ") does not exist");
     }
+    
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+            blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+        storageID == null ? null : node.getStorageInfo(storageID),
+        node);
+  }
+
+  /**
+   * 
+   * @param b
+   * @param storageInfo storage that contains the block, if known. null otherwise.
+   * @throws IOException
+   */
+  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+      DatanodeStorageInfo storageInfo,
+      DatanodeDescriptor node) throws IOException {
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
@@ -1121,7 +1132,9 @@ public class BlockManager {
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(storageID, b.stored);
+    if (storageInfo != null) {
+      storageInfo.addBlock(b.stored);
+    }
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@@ -1460,7 +1473,7 @@ public class BlockManager {
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      List, boolean, Set, long)
+   *      List, boolean, Set, long, StorageType)
    */
   public DatanodeStorageInfo[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1697,7 +1710,7 @@ public class BlockManager {
    * @throws IOException
    */
   public boolean processReport(final DatanodeID nodeID,
-      final DatanodeStorage storage, final String poolId,
+      final DatanodeStorage storage,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
@@ -1729,9 +1742,9 @@ public class BlockManager {
       if (storageInfo.numBlocks() == 0) {
         // The first block report can be processed a lot more efficiently than
         // ordinary block reports.  This shortens restart times.
-        processFirstBlockReport(node, storage.getStorageID(), newReport);
+        processFirstBlockReport(storageInfo, newReport);
       } else {
-        processReport(node, storage, newReport);
+        processReport(storageInfo, newReport);
       }
       
       // Now that we have an up-to-date block report, we know that any
@@ -1793,9 +1806,8 @@ public class BlockManager {
     }
   }
   
-  private void processReport(final DatanodeDescriptor node,
-      final DatanodeStorage storage,
-      final BlockListAsLongs report) throws IOException {
+  private void processReport(final DatanodeStorageInfo storageInfo,
+                             final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -1805,19 +1817,20 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, storage, report,
+    reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+   
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1831,7 +1844,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storage.getStorageID());
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -1842,16 +1855,16 @@ public class BlockManager {
    * a toRemove list (since there won't be any).  It also silently discards 
    * any invalid blocks, thereby deferring their processing until 
    * the next block report.
-   * @param node - DatanodeDescriptor of the node that sent the report
+   * @param storageInfo - DatanodeStorageInfo that sent the report
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  private void processFirstBlockReport(final DatanodeDescriptor node,
-      final String storageID,
+  private void processFirstBlockReport(
+      final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
-    assert (node.getStorageInfo(storageID).numBlocks() == 0);
+    assert (storageInfo.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
     while(itBR.hasNext()) {
@@ -1860,7 +1873,7 @@ public class BlockManager {
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
-        queueReportedBlock(node, storageID, iblk, reportedState,
+        queueReportedBlock(storageInfo, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
@@ -1872,15 +1885,16 @@ public class BlockManager {
       // If block is corrupt, mark it and continue to next block.
       BlockUCState ucState = storedBlock.getBlockUCState();
       BlockToMarkCorrupt c = checkReplicaCorrupt(
-          iblk, reportedState, storedBlock, ucState, node);
+          iblk, reportedState, storedBlock, ucState,
+          storageInfo.getDatanodeDescriptor());
       if (c != null) {
         if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
-          queueReportedBlock(node, storageID, iblk, reportedState,
+          queueReportedBlock(storageInfo, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c, node, storageID);
+          markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
         }
         continue;
       }
@@ -1888,7 +1902,7 @@ public class BlockManager {
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            node.getStorageInfo(storageID), iblk, reportedState);
+            storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
@@ -1901,12 +1915,12 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, node, storageID);
+        addStoredBlockImmediate(storedBlock, storageInfo);
       }
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, 
+  private void reportDiff(DatanodeStorageInfo storageInfo, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
@@ -1914,8 +1928,6 @@ public class BlockManager {
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
-
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1932,7 +1944,7 @@ public class BlockManager {
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+      BlockInfo storedBlock = processReportedBlock(storageInfo,
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 
       // move block to the head of the list
@@ -1969,7 +1981,7 @@ public class BlockManager {
    * BlockInfoUnderConstruction's list of replicas.</li>
    * </ol>
    * 
-   * @param dn descriptor for the datanode that made the report
+   * @param storageInfo DatanodeStorageInfo that sent the report.
    * @param block reported block replica
    * @param reportedState reported replica state
    * @param toAdd add to DatanodeDescriptor
@@ -1981,14 +1993,16 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
-      final String storageID,
+  private BlockInfo processReportedBlock(
+      final DatanodeStorageInfo storageInfo,
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
     
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("Reported block " + block
           + " on " + dn + " size " + block.getNumBytes()
@@ -1997,7 +2011,7 @@ public class BlockManager {
   
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(dn, storageID, block, reportedState,
+      queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
@@ -2037,7 +2051,7 @@ public class BlockManager {
         // TODO: Pretty confident this should be s/storedBlock/block below,
         // since we should be postponing the info of the reported block, not
         // the stored block. See HDFS-6289 for more context.
-        queueReportedBlock(dn, storageID, storedBlock, reportedState,
+        queueReportedBlock(storageInfo, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
@@ -2066,17 +2080,17 @@ public class BlockManager {
    * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
-  private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+  private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, String reason) {
     assert shouldPostponeBlocksFromFuture;
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queueing reported block " + block +
           " in state " + reportedState + 
-          " from datanode " + dn + " for later processing " +
-          "because " + reason + ".");
+          " from datanode " + storageInfo.getDatanodeDescriptor() +
+          " for later processing because " + reason + ".");
     }
-    pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
+    pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
   }
 
   /**
@@ -2099,7 +2113,7 @@ public class BlockManager {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing previouly queued message " + rbi);
       }
-      processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), 
+      processAndHandleReportedBlock(rbi.getStorageInfo(), 
           rbi.getBlock(), rbi.getReportedState(), null);
     }
   }
@@ -2156,6 +2170,16 @@ public class BlockManager {
         } else {
           return null; // not corrupt
         }
+      case UNDER_CONSTRUCTION:
+        if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
+          final long reportedGS = reported.getGenerationStamp();
+          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+              + ucState + " and reported state " + reportedState
+              + ", But reported genstamp " + reportedGS
+              + " does not match genstamp in block map "
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+        }
+        return null;
       default:
         return null;
       }
@@ -2219,19 +2243,20 @@ public class BlockManager {
   }
 
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
-      DatanodeDescriptor node, String storageID) throws IOException {
+      DatanodeStorageInfo storageInfo) throws IOException {
     BlockInfoUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
-        ucBlock.reportedBlock, ucBlock.reportedState);
+    block.addReplicaIfNotPresent(
+        storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
 
-    if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
-      addStoredBlock(block, node, storageID, null, true);
+    if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+        block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
+      addStoredBlock(block, storageInfo, null, true);
     }
   } 
 
   /**
    * Faster version of
-   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
+   * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
    * , intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
@@ -2242,17 +2267,17 @@ public class BlockManager {
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
-      DatanodeDescriptor node, String storageID)
+      DatanodeStorageInfo storageInfo)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, node, storageID, null, false);
+      addStoredBlock(storedBlock, storageInfo, null, false);
       return;
     }
 
     // just add it
-    node.addBlock(storageID, storedBlock);
+    storageInfo.addBlock(storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2274,13 +2299,13 @@ public class BlockManager {
    * @return the block that is stored in blockMap.
    */
   private Block addStoredBlock(final BlockInfo block,
-                               DatanodeDescriptor node,
-                               String storageID,
+                               DatanodeStorageInfo storageInfo,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
       storedBlock = blocksMap.getStoredBlock(block);
@@ -2300,7 +2325,7 @@ public class BlockManager {
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = node.addBlock(storageID, storedBlock);
+    boolean added = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
     if (added) {
@@ -2829,12 +2854,15 @@ public class BlockManager {
     } else {
       final String[] datanodeUuids = new String[locations.size()];
       final String[] storageIDs = new String[datanodeUuids.length];
+      final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
       for(int i = 0; i < locations.size(); i++) {
         final DatanodeStorageInfo s = locations.get(i);
         datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
         storageIDs[i] = s.getStorageID();
+        storageTypes[i] = s.getStorageType();
       }
-      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
+      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
+          storageTypes));
       return block.getNumBytes();
     }
   }
@@ -2843,8 +2871,9 @@ public class BlockManager {
    * The given node is reporting that it received a certain block.
    */
   @VisibleForTesting
-  void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
+  void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
       throws IOException {
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
@@ -2864,12 +2893,12 @@ public class BlockManager {
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.decrement(block, node);
-    processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
+    processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
   
-  private void processAndHandleReportedBlock(DatanodeDescriptor node,
-      String storageID, Block block,
+  private void processAndHandleReportedBlock(
+      DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
@@ -2877,7 +2906,9 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, storageID, block, reportedState,
+    final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+    processReportedBlock(storageInfo, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2885,11 +2916,11 @@ public class BlockManager {
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storageID);
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2903,7 +2934,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storageID);
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -2930,13 +2961,15 @@ public class BlockManager {
           "Got incremental block report from unregistered or dead node");
     }
 
-    if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+    DatanodeStorageInfo storageInfo =
+        node.getStorageInfo(srdb.getStorage().getStorageID());
+    if (storageInfo == null) {
       // The DataNode is reporting an unknown storage. Usually the NN learns
       // about new storages from heartbeats but during NN restart we may
       // receive a block report or incremental report before the heartbeat.
       // We must handle this for protocol compatibility. This issue was
       // uncovered by HDFS-6094.
-      node.updateStorage(srdb.getStorage());
+      storageInfo = node.updateStorage(srdb.getStorage());
     }
 
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2946,14 +2979,13 @@ public class BlockManager {
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), ReplicaState.RBW, null);
+        processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+                                      ReplicaState.RBW, null);
         break;
       default:
         String msg = 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Aug  5 02:30:54 2014
@@ -260,8 +260,8 @@ public class DatanodeDescriptor extends 
   }
 
   public StorageReport[] getStorageReports() {
-    final StorageReport[] reports = new StorageReport[storageMap.size()];
     final DatanodeStorageInfo[] infos = getStorageInfos();
+    final StorageReport[] reports = new StorageReport[infos.length];
     for(int i = 0; i < infos.length; i++) {
       reports[i] = infos[i].toStorageReport();
     }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Tue Aug  5 02:30:54 2014
@@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  boolean addBlock(BlockInfo b) {
+  public boolean addBlock(BlockInfo b) {
     if(!b.addStorage(this))
       return false;
     // add to the head of the data-node list

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Tue Aug  5 02:30:54 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 
 /**
  * In the Standby Node, we can receive messages about blocks
@@ -41,14 +42,12 @@ class PendingDataNodeMessages {
     
   static class ReportedBlockInfo {
     private final Block block;
-    private final DatanodeDescriptor dn;
-    private final String storageID;
+    private final DatanodeStorageInfo storageInfo;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
+    ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
         ReplicaState reportedState) {
-      this.dn = dn;
-      this.storageID = storageID;
+      this.storageInfo = storageInfo;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -57,21 +56,18 @@ class PendingDataNodeMessages {
       return block;
     }
 
-    DatanodeDescriptor getNode() {
-      return dn;
-    }
-    
-    String getStorageID() {
-      return storageID;
-    }
-
     ReplicaState getReportedState() {
       return reportedState;
     }
+    
+    DatanodeStorageInfo getStorageInfo() {
+      return storageInfo;
+    }
 
     @Override
     public String toString() {
-      return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+      return "ReportedBlockInfo [block=" + block + ", dn="
+          + storageInfo.getDatanodeDescriptor()
           + ", reportedState=" + reportedState + "]";
     }
   }
@@ -87,7 +83,7 @@ class PendingDataNodeMessages {
       Queue<ReportedBlockInfo> oldQueue = entry.getValue();
       while (!oldQueue.isEmpty()) {
         ReportedBlockInfo rbi = oldQueue.remove();
-        if (!rbi.getNode().equals(dn)) {
+        if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
           newQueue.add(rbi);
         } else {
           count--;
@@ -97,11 +93,11 @@ class PendingDataNodeMessages {
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+  void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, storageID, block, reportedState));
+        new ReportedBlockInfo(storageInfo, block, reportedState));
     count++;
   }
   

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Aug  5 02:30:54 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * One instance per block-pool/namespace on the DN, which handles the
@@ -91,6 +94,28 @@ class BPOfferService {
    */
   private long lastActiveClaimTxId = -1;
 
+  private final ReentrantReadWriteLock mReadWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock mReadLock  = mReadWriteLock.readLock();
+  private final Lock mWriteLock = mReadWriteLock.writeLock();
+
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    mReadLock.lock();
+  }
+
+  void readUnlock() {
+    mReadLock.unlock();
+  }
+
+  void writeLock() {
+    mWriteLock.lock();
+  }
+
+  void writeUnlock() {
+    mWriteLock.unlock();
+  }
+
   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
     Preconditions.checkArgument(!nnAddrs.isEmpty(),
         "Must pass at least one NN.");
@@ -135,14 +160,19 @@ class BPOfferService {
     }
     return false;
   }
-  
-  synchronized String getBlockPoolId() {
-    if (bpNSInfo != null) {
-      return bpNSInfo.getBlockPoolID();
-    } else {
-      LOG.warn("Block pool ID needed, but service not yet registered with NN",
-          new Exception("trace"));
-      return null;
+
+  String getBlockPoolId() {
+    readLock();
+    try {
+      if (bpNSInfo != null) {
+        return bpNSInfo.getBlockPoolID();
+      } else {
+        LOG.warn("Block pool ID needed, but service not yet registered with NN",
+            new Exception("trace"));
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -150,27 +180,37 @@ class BPOfferService {
     return getNamespaceInfo() != null;
   }
 
-  synchronized NamespaceInfo getNamespaceInfo() {
-    return bpNSInfo;
+  NamespaceInfo getNamespaceInfo() {
+    readLock();
+    try {
+      return bpNSInfo;
+    } finally {
+      readUnlock();
+    }
   }
 
   @Override
-  public synchronized String toString() {
-    if (bpNSInfo == null) {
-      // If we haven't yet connected to our NN, we don't yet know our
-      // own block pool ID.
-      // If _none_ of the block pools have connected yet, we don't even
-      // know the DatanodeID ID of this DN.
-      String datanodeUuid = dn.getDatanodeUuid();
+  public String toString() {
+    readLock();
+    try {
+      if (bpNSInfo == null) {
+        // If we haven't yet connected to our NN, we don't yet know our
+        // own block pool ID.
+        // If _none_ of the block pools have connected yet, we don't even
+        // know the DatanodeID ID of this DN.
+        String datanodeUuid = dn.getDatanodeUuid();
 
-      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
-        datanodeUuid = "unassigned";
+        if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+          datanodeUuid = "unassigned";
+        }
+        return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
+      } else {
+        return "Block pool " + getBlockPoolId() +
+            " (Datanode Uuid " + dn.getDatanodeUuid() +
+            ")";
       }
-      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
-    } else {
-      return "Block pool " + getBlockPoolId() +
-          " (Datanode Uuid " + dn.getDatanodeUuid() +
-          ")";
+    } finally {
+      readUnlock();
     }
   }
   
@@ -266,32 +306,37 @@ class BPOfferService {
    * verifies that this namespace matches (eg to prevent a misconfiguration
    * where a StandbyNode from a different cluster is specified)
    */
-  synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
-    if (this.bpNSInfo == null) {
-      this.bpNSInfo = nsInfo;
-      boolean success = false;
-
-      // Now that we know the namespace ID, etc, we can pass this to the DN.
-      // The DN can now initialize its local storage if we are the
-      // first BP to handshake, etc.
-      try {
-        dn.initBlockPool(this);
-        success = true;
-      } finally {
-        if (!success) {
-          // The datanode failed to initialize the BP. We need to reset
-          // the namespace info so that other BPService actors still have
-          // a chance to set it, and re-initialize the datanode.
-          this.bpNSInfo = null;
+  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+    writeLock();
+    try {
+      if (this.bpNSInfo == null) {
+        this.bpNSInfo = nsInfo;
+        boolean success = false;
+
+        // Now that we know the namespace ID, etc, we can pass this to the DN.
+        // The DN can now initialize its local storage if we are the
+        // first BP to handshake, etc.
+        try {
+          dn.initBlockPool(this);
+          success = true;
+        } finally {
+          if (!success) {
+            // The datanode failed to initialize the BP. We need to reset
+            // the namespace info so that other BPService actors still have
+            // a chance to set it, and re-initialize the datanode.
+            this.bpNSInfo = null;
+          }
         }
+      } else {
+        checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+            "Blockpool ID");
+        checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+            "Namespace ID");
+        checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+            "Cluster ID");
       }
-    } else {
-      checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
-          "Blockpool ID");
-      checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
-          "Namespace ID");
-      checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
-          "Cluster ID");
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -300,22 +345,27 @@ class BPOfferService {
    * NN, it calls this function to verify that the NN it connected to
    * is consistent with other NNs serving the block-pool.
    */
-  synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
+  void registrationSucceeded(BPServiceActor bpServiceActor,
       DatanodeRegistration reg) throws IOException {
-    if (bpRegistration != null) {
-      checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
-          reg.getStorageInfo().getNamespaceID(), "namespace ID");
-      checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
-          reg.getStorageInfo().getClusterID(), "cluster ID");
-    } else {
-      bpRegistration = reg;
-    }
-    
-    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-    // Add the initial block token secret keys to the DN's secret manager.
-    if (dn.isBlockTokenEnabled) {
-      dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
-          reg.getExportedKeys());
+    writeLock();
+    try {
+      if (bpRegistration != null) {
+        checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
+            reg.getStorageInfo().getNamespaceID(), "namespace ID");
+        checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
+            reg.getStorageInfo().getClusterID(), "cluster ID");
+      } else {
+        bpRegistration = reg;
+      }
+
+      dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+      // Add the initial block token secret keys to the DN's secret manager.
+      if (dn.isBlockTokenEnabled) {
+        dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+            reg.getExportedKeys());
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -333,25 +383,35 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
-    Preconditions.checkState(bpNSInfo != null,
-        "getRegistration() can only be called after initial handshake");
-    return dn.createBPRegistration(bpNSInfo);
+  DatanodeRegistration createRegistration() {
+    writeLock();
+    try {
+      Preconditions.checkState(bpNSInfo != null,
+          "getRegistration() can only be called after initial handshake");
+      return dn.createBPRegistration(bpNSInfo);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Called when an actor shuts down. If this is the last actor
    * to shut down, shuts down the whole blockpool in the DN.
    */
-  synchronized void shutdownActor(BPServiceActor actor) {
-    if (bpServiceToActive == actor) {
-      bpServiceToActive = null;
-    }
+  void shutdownActor(BPServiceActor actor) {
+    writeLock();
+    try {
+      if (bpServiceToActive == actor) {
+        bpServiceToActive = null;
+      }
 
-    bpServices.remove(actor);
+      bpServices.remove(actor);
 
-    if (bpServices.isEmpty()) {
-      dn.shutdownBlockPool(this);
+      if (bpServices.isEmpty()) {
+        dn.shutdownBlockPool(this);
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -392,11 +452,16 @@ class BPOfferService {
    * @return a proxy to the active NN, or null if the BPOS has not
    * acknowledged any NN as active yet.
    */
-  synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
-    if (bpServiceToActive != null) {
-      return bpServiceToActive.bpNamenode;
-    } else {
-      return null;
+  DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+    readLock();
+    try {
+      if (bpServiceToActive != null) {
+        return bpServiceToActive.bpNamenode;
+      } else {
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -424,45 +489,50 @@ class BPOfferService {
    * @param actor the actor which received the heartbeat
    * @param nnHaState the HA-related heartbeat contents
    */
-  synchronized void updateActorStatesFromHeartbeat(
+  void updateActorStatesFromHeartbeat(
       BPServiceActor actor,
       NNHAStatusHeartbeat nnHaState) {
-    final long txid = nnHaState.getTxId();
-    
-    final boolean nnClaimsActive =
-      nnHaState.getState() == HAServiceState.ACTIVE;
-    final boolean bposThinksActive = bpServiceToActive == actor;
-    final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; 
-    
-    if (nnClaimsActive && !bposThinksActive) {
-      LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
-          "txid=" + txid);
-      if (!isMoreRecentClaim) {
-        // Split-brain scenario - an NN is trying to claim active
-        // state when a different NN has already claimed it with a higher
-        // txid.
-        LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
-            txid + " but there was already a more recent claim at txid=" +
-            lastActiveClaimTxId);
-        return;
-      } else {
-        if (bpServiceToActive == null) {
-          LOG.info("Acknowledging ACTIVE Namenode " + actor);
+    writeLock();
+    try {
+      final long txid = nnHaState.getTxId();
+
+      final boolean nnClaimsActive =
+          nnHaState.getState() == HAServiceState.ACTIVE;
+      final boolean bposThinksActive = bpServiceToActive == actor;
+      final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+      if (nnClaimsActive && !bposThinksActive) {
+        LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+            "txid=" + txid);
+        if (!isMoreRecentClaim) {
+          // Split-brain scenario - an NN is trying to claim active
+          // state when a different NN has already claimed it with a higher
+          // txid.
+          LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+              txid + " but there was already a more recent claim at txid=" +
+              lastActiveClaimTxId);
+          return;
         } else {
-          LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
-              bpServiceToActive + " at higher txid=" + txid);
+          if (bpServiceToActive == null) {
+            LOG.info("Acknowledging ACTIVE Namenode " + actor);
+          } else {
+            LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+                bpServiceToActive + " at higher txid=" + txid);
+          }
+          bpServiceToActive = actor;
         }
-        bpServiceToActive = actor;
+      } else if (!nnClaimsActive && bposThinksActive) {
+        LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+            "txid=" + nnHaState.getTxId());
+        bpServiceToActive = null;
       }
-    } else if (!nnClaimsActive && bposThinksActive) {
-      LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
-          "txid=" + nnHaState.getTxId());
-      bpServiceToActive = null;
-    }
-    
-    if (bpServiceToActive == actor) {
-      assert txid >= lastActiveClaimTxId;
-      lastActiveClaimTxId = txid;
+
+      if (bpServiceToActive == actor) {
+        assert txid >= lastActiveClaimTxId;
+        lastActiveClaimTxId = txid;
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -533,12 +603,15 @@ class BPOfferService {
       actor.reRegister();
       return true;
     }
-    synchronized (this) {
+    writeLock();
+    try {
       if (actor == bpServiceToActive) {
         return processCommandFromActive(cmd, actor);
       } else {
         return processCommandFromStandby(cmd, actor);
       }
+    } finally {
+      writeUnlock();
     }
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Aug  5 02:30:54 2014
@@ -152,7 +152,7 @@ public class BlockPoolSliceStorage exten
     // During startup some of them can upgrade or roll back
     // while others could be up-to-date for the regular startup.
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(getStorageDir(idx), nsInfo, startOpt);
+      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
       assert getCTime() == nsInfo.getCTime() 
           : "Data-node and name-node CTimes must be the same.";
     }
@@ -242,7 +242,7 @@ public class BlockPoolSliceStorage exten
    * @param startOpt startup option
    * @throws IOException
    */
-  private void doTransition(StorageDirectory sd,
+  private void doTransition(DataNode datanode, StorageDirectory sd,
       NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
     if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
@@ -275,7 +275,7 @@ public class BlockPoolSliceStorage exten
     }
     if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
-      doUpgrade(sd, nsInfo); // upgrade
+      doUpgrade(datanode, sd, nsInfo); // upgrade
       return;
     }
     // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@@ -304,7 +304,8 @@ public class BlockPoolSliceStorage exten
    * @param nsInfo Namespace Info from the namenode
    * @throws IOException on error
    */
-  void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
+  void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
+      throws IOException {
     // Upgrading is applicable only to release with federation or after
     if (!DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@@ -312,7 +313,7 @@ public class BlockPoolSliceStorage exten
     }
     LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
         + ".\n   old LV = " + this.getLayoutVersion() + "; old CTime = "
-        + this.getCTime() + ".\n   new LV = " + nsInfo.getLayoutVersion()
+        + this.getCTime() + ".\n   new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION
         + "; new CTime = " + nsInfo.getCTime());
     // get <SD>/previous directory
     String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
@@ -340,7 +341,7 @@ public class BlockPoolSliceStorage exten
     rename(bpCurDir, bpTmpDir);
     
     // 3. Create new <SD>/current with block files hardlinks and VERSION
-    linkAllBlocks(bpTmpDir, bpCurDir);
+    linkAllBlocks(datanode, bpTmpDir, bpCurDir);
     this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() 
         : "Data-node and name-node layout versions must be the same.";
@@ -517,14 +518,15 @@ public class BlockPoolSliceStorage exten
    * @param toDir the current data directory
    * @throws IOException if error occurs during hardlink
    */
-  private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+  private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
+      throws IOException {
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
     // hardlink finalized blocks in tmpDir
     HardLink hardLink = new HardLink();
-    DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), 
+    DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
       new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
-    DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW), 
+    DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
         new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
     LOG.info( hardLink.linkStats.report() );
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Aug  5 02:30:54 2014
@@ -253,7 +253,7 @@ class BlockReceiver implements Closeable
       
       if (cause != null) { // possible disk error
         ioe = cause;
-        datanode.checkDiskError();
+        datanode.checkDiskErrorAsync();
       }
       
       throw ioe;
@@ -329,7 +329,7 @@ class BlockReceiver implements Closeable
     }
     // disk check
     if(ioe != null) {
-      datanode.checkDiskError();
+      datanode.checkDiskErrorAsync();
       throw ioe;
     }
   }
@@ -639,7 +639,7 @@ class BlockReceiver implements Closeable
           manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
-        datanode.checkDiskError();
+        datanode.checkDiskErrorAsync();
         throw iex;
       }
     }
@@ -1208,7 +1208,7 @@ class BlockReceiver implements Closeable
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
-            datanode.checkDiskError();
+            datanode.checkDiskErrorAsync();
             LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug  5 02:30:54 2014
@@ -1075,6 +1075,11 @@ public class DataNode extends Configured
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
     initStorage(nsInfo);
+
+    // Exclude failed disks before initializing the block pools to avoid startup
+    // failures.
+    checkDiskError();
+
     initPeriodicScanners(conf);
     
     data.addBlockPool(nsInfo.getBlockPoolID(), conf);
@@ -1510,9 +1515,9 @@ public class DataNode extends Configured
   
   
   /**
-   *  Check if there is a disk failure and if so, handle the error
+   * Check if there is a disk failure asynchronously and if so, handle the error
    */
-  public void checkDiskError() {
+  public void checkDiskErrorAsync() {
     synchronized(checkDiskErrorMutex) {
       checkDiskErrorFlag = true;
       if(checkDiskErrorThread == null) {
@@ -1821,7 +1826,7 @@ public class DataNode extends Configured
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
             targets[0] + " got ", ie);
         // check if there are any disk problem
-        checkDiskError();
+        checkDiskErrorAsync();
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
@@ -2759,7 +2764,18 @@ public class DataNode extends Configured
   public ShortCircuitRegistry getShortCircuitRegistry() {
     return shortCircuitRegistry;
   }
-  
+
+  /**
+   * Check the disk error
+   */
+  private void checkDiskError() {
+    try {
+      data.checkDataDir();
+    } catch (DiskErrorException de) {
+      handleDiskError(de.getMessage());
+    }
+  }
+
   /**
    * Starts a new thread which will check for disk error check request 
    * every 5 sec
@@ -2776,9 +2792,7 @@ public class DataNode extends Configured
               }
               if(tempFlag) {
                 try {
-                  data.checkDataDir();
-                } catch (DiskErrorException de) {
-                  handleDiskError(de.getMessage());
+                  checkDiskError();
                 } catch (Exception e) {
                   LOG.warn("Unexpected exception occurred while checking disk error  " + e);
                   checkDiskErrorThread = null;

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java Tue Aug  5 02:30:54 2014
@@ -62,7 +62,10 @@ public class DataNodeLayoutVersion {  
    * </ul>
    */
   public static enum Feature implements LayoutFeature {
-    FIRST_LAYOUT(-55, -53, "First datanode layout", false);
+    FIRST_LAYOUT(-55, -53, "First datanode layout", false),
+    BLOCKID_BASED_LAYOUT(-56,
+        "The block ID of a finalized block uniquely determines its position " +
+            "in the directory structure");
    
     private final FeatureInfo info;
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Aug  5 02:30:54 2014
@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -35,13 +41,30 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /** 
  * Data storage information file.
@@ -261,6 +284,7 @@ public class DataStorage extends Storage
           STORAGE_DIR_CURRENT));
       bpDataDirs.add(bpRoot);
     }
+
     // mkdir for the list of BlockPoolStorage
     makeBlockPoolDataDir(bpDataDirs, null);
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@@ -488,7 +512,7 @@ public class DataStorage extends Storage
     
     // do upgrade
     if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
-      doUpgrade(sd, nsInfo);  // upgrade
+      doUpgrade(datanode, sd, nsInfo);  // upgrade
       return;
     }
     
@@ -523,7 +547,8 @@ public class DataStorage extends Storage
    * @param sd  storage directory
    * @throws IOException on error
    */
-  void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+  void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
+      throws IOException {
     // If the existing on-disk layout version supportes federation, simply
     // update its layout version.
     if (DataNodeLayoutVersion.supports(
@@ -568,7 +593,8 @@ public class DataStorage extends Storage
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), 
         nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
     bpStorage.format(curDir, nsInfo);
-    linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+    linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
+        STORAGE_DIR_CURRENT));
     
     // 4. Write version file under <SD>/current
     layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@@ -746,22 +772,22 @@ public class DataStorage extends Storage
    *
    * @throws IOException If error occurs during hardlink
    */
-  private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
-      throws IOException {
+  private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
+      File toDir) throws IOException {
     HardLink hardLink = new HardLink();
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
     if (DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
       // hardlink finalized blocks in tmpDir/finalized
-      linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
+      linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
           new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
       // hardlink rbw blocks in tmpDir/rbw
-      linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
+      linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
           new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
     } else { // pre-RBW version
       // hardlink finalized blocks in tmpDir
-      linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), 
+      linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
           diskLayoutVersion, hardLink);      
       if (fromBbwDir.exists()) {
         /*
@@ -770,15 +796,67 @@ public class DataStorage extends Storage
          * NOT underneath the 'current' directory in those releases.  See
          * HDFS-3731 for details.
          */
-        linkBlocks(fromBbwDir,
+        linkBlocks(datanode, fromBbwDir,
             new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
       }
     } 
     LOG.info( hardLink.linkStats.report() );
   }
+
+  private static class LinkArgs {
+    public File src;
+    public File dst;
+
+    public LinkArgs(File src, File dst) {
+      this.src = src;
+      this.dst = dst;
+    }
+  }
+
+  static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
+      HardLink hl) throws IOException {
+    boolean upgradeToIdBasedLayout = false;
+    // If we are upgrading from a version older than the one where we introduced
+    // block ID-based layout AND we're working with the finalized directory,
+    // we'll need to upgrade from the old flat layout to the block ID-based one
+    if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
+        getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
+      upgradeToIdBasedLayout = true;
+    }
+
+    final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
+    linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
+        idBasedLayoutSingleLinks);
+    int numLinkWorkers = datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
+        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
+    ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
+    final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
+    List<Future<Void>> futures = Lists.newArrayList();
+    for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
+      final int iCopy = i;
+      futures.add(linkWorkers.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          int upperBound = Math.min(iCopy + step,
+              idBasedLayoutSingleLinks.size());
+          for (int j = iCopy; j < upperBound; j++) {
+            LinkArgs cur = idBasedLayoutSingleLinks.get(j);
+            NativeIO.link(cur.src, cur.dst);
+          }
+          return null;
+        }
+      }));
+    }
+    linkWorkers.shutdown();
+    for (Future<Void> f : futures) {
+      Futures.get(f, IOException.class);
+    }
+  }
   
-  static void linkBlocks(File from, File to, int oldLV, HardLink hl) 
-  throws IOException {
+  static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
+  boolean upgradeToIdBasedLayout, File blockRoot,
+      List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
     if (!from.exists()) {
       return;
     }
@@ -805,9 +883,6 @@ public class DataStorage extends Storage
     // from is a directory
     hl.linkStats.countDirs++;
     
-    if (!to.mkdirs())
-      throw new IOException("Cannot create directory " + to);
-    
     String[] blockNames = from.list(new java.io.FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
@@ -815,12 +890,36 @@ public class DataStorage extends Storage
       }
     });
 
+    // If we are upgrading to block ID-based layout, we don't want to recreate
+    // any subdirs from the source that contain blocks, since we have a new
+    // directory structure
+    if (!upgradeToIdBasedLayout || !to.getName().startsWith(
+        BLOCK_SUBDIR_PREFIX)) {
+      if (!to.mkdirs())
+        throw new IOException("Cannot create directory " + to);
+    }
+
     // Block files just need hard links with the same file names
     // but a different directory
     if (blockNames.length > 0) {
-      HardLink.createHardLinkMult(from, blockNames, to);
-      hl.linkStats.countMultLinks++;
-      hl.linkStats.countFilesMultLinks += blockNames.length;
+      if (upgradeToIdBasedLayout) {
+        for (String blockName : blockNames) {
+          long blockId = Block.getBlockId(blockName);
+          File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
+          if (!blockLocation.exists()) {
+            if (!blockLocation.mkdirs()) {
+              throw new IOException("Failed to mkdirs " + blockLocation);
+            }
+          }
+          idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
+              new File(blockLocation, blockName)));
+          hl.linkStats.countSingleLinks++;
+        }
+      } else {
+        HardLink.createHardLinkMult(from, blockNames, to);
+        hl.linkStats.countMultLinks++;
+        hl.linkStats.countFilesMultLinks += blockNames.length;
+      }
     } else {
       hl.linkStats.countEmptyDirs++;
     }
@@ -834,8 +933,9 @@ public class DataStorage extends Storage
         }
       });
     for(int i = 0; i < otherNames.length; i++)
-      linkBlocks(new File(from, otherNames[i]), 
-          new File(to, otherNames[i]), oldLV, hl);
+      linkBlocksHelper(new File(from, otherNames[i]),
+          new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
+          blockRoot, idBasedLayoutSingleLinks);
   }
 
   /**

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Tue Aug  5 02:30:54 2014
@@ -30,6 +30,8 @@ public class DatanodeUtil {
 
   public static final String DISK_ERROR = "Possible disk error: ";
 
+  private static final String SEP = System.getProperty("file.separator");
+
   /** Get the cause of an I/O exception if caused by a possible disk error
    * @param ioe an I/O exception
    * @return cause if the I/O exception is caused by a possible disk error;
@@ -78,4 +80,38 @@ public class DatanodeUtil {
   public static File getUnlinkTmpFile(File f) {
     return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
   }
+
+  /**
+   * Checks whether there are any files anywhere in the directory tree rooted
+   * at dir (directories don't count as files). dir must exist
+   * @return true if there are no files
+   * @throws IOException if unable to list subdirectories
+   */
+  public static boolean dirNoFilesRecursive(File dir) throws IOException {
+    File[] contents = dir.listFiles();
+    if (contents == null) {
+      throw new IOException("Cannot list contents of " + dir);
+    }
+    for (File f : contents) {
+      if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the directory where a finalized block with this ID should be stored.
+   * Do not attempt to create the directory.
+   * @param root the root directory where finalized blocks are stored
+   * @param blockId
+   * @return
+   */
+  public static File idToBlockDir(File root, long blockId) {
+    int d1 = (int)((blockId >> 16) & 0xff);
+    int d2 = (int)((blockId >> 8) & 0xff);
+    String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+        DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+    return new File(root, path);
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Tue Aug  5 02:30:54 2014
@@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend
   private File baseDir;
   
   /**
-   * Ints representing the sub directory path from base dir to the directory
-   * containing this replica.
+   * Whether or not this replica's parent directory includes subdirs, in which
+   * case we can generate them based on the replica's block ID
    */
-  private int[] subDirs;
+  private boolean hasSubdirs;
   
   private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
@@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend
    * @return the parent directory path where this replica is located
    */
   File getDir() {
-    if (subDirs == null) {
-      return null;
-    }
-
-    StringBuilder sb = new StringBuilder();
-    for (int i : subDirs) {
-      sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
-      sb.append(i);
-      sb.append("/");
-    }
-    File ret = new File(baseDir, sb.toString());
-    return ret;
+    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+        getBlockId()) : baseDir;
   }
 
   /**
@@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend
 
   private void setDirInternal(File dir) {
     if (dir == null) {
-      subDirs = null;
       baseDir = null;
       return;
     }
 
-    ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
-    this.subDirs = replicaDirInfo.subDirs;
+    ReplicaDirInfo dirInfo = parseBaseDir(dir);
+    this.hasSubdirs = dirInfo.hasSubidrs;
     
     synchronized (internedBaseDirs) {
-      if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
+      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
         // Create a new String path of this file and make a brand new File object
         // to guarantee we drop the reference to the underlying char[] storage.
-        File baseDir = new File(replicaDirInfo.baseDirPath);
-        internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
+        File baseDir = new File(dirInfo.baseDirPath);
+        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
       }
-      this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
+      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
     }
   }
-  
+
   @VisibleForTesting
   public static class ReplicaDirInfo {
-    @VisibleForTesting
     public String baseDirPath;
-    
-    @VisibleForTesting
-    public int[] subDirs;
+    public boolean hasSubidrs;
+
+    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+      this.baseDirPath = baseDirPath;
+      this.hasSubidrs = hasSubidrs;
+    }
   }
   
   @VisibleForTesting
-  public static ReplicaDirInfo parseSubDirs(File dir) {
-    ReplicaDirInfo ret = new ReplicaDirInfo();
+  public static ReplicaDirInfo parseBaseDir(File dir) {
     
     File currentDir = dir;
-    List<Integer> subDirList = new ArrayList<Integer>();
+    boolean hasSubdirs = false;
     while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
-      // Prepend the integer into the list.
-      subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
-          DataStorage.BLOCK_SUBDIR_PREFIX, "")));
+      hasSubdirs = true;
       currentDir = currentDir.getParentFile();
     }
-    ret.subDirs = new int[subDirList.size()];
-    for (int i = 0; i < subDirList.size(); i++) {
-      ret.subDirs[i] = subDirList.get(i);
-    }
-    
-    ret.baseDirPath = currentDir.getAbsolutePath();
     
-    return ret;
+    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
   }
 
   /**

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Tue Aug  5 02:30:54 2014
@@ -59,7 +59,8 @@ class BlockPoolSlice {
   private final String bpid;
   private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
   private final File currentDir; // StorageDirectory/current/bpid/current
-  private final LDir finalizedDir; // directory store Finalized replica
+  // directory where finalized replicas are stored
+  private final File finalizedDir;
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
   private static final String DU_CACHE_FILE = "dfsUsed";
@@ -82,8 +83,13 @@ class BlockPoolSlice {
     this.bpid = bpid;
     this.volume = volume;
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
-    final File finalizedDir = new File(
+    this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+    if (!this.finalizedDir.exists()) {
+      if (!this.finalizedDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + this.finalizedDir);
+      }
+    }
 
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
@@ -95,10 +101,6 @@ class BlockPoolSlice {
       FileUtil.fullyDelete(tmpDir);
     }
     this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
-    final int maxBlocksPerDir = conf.getInt(
-        DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
-        DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
-    this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
     if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
       if (!rbwDir.isDirectory()) {
         throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -131,7 +133,7 @@ class BlockPoolSlice {
   }
 
   File getFinalizedDir() {
-    return finalizedDir.dir;
+    return finalizedDir;
   }
   
   File getRbwDir() {
@@ -239,26 +241,57 @@ class BlockPoolSlice {
   }
 
   File addBlock(Block b, File f) throws IOException {
-    File blockFile = finalizedDir.addBlock(b, f);
+    File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+    if (!blockDir.exists()) {
+      if (!blockDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + blockDir);
+      }
+    }
+    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
     dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     return blockFile;
   }
     
   void checkDirs() throws DiskErrorException {
-    finalizedDir.checkDirTree();
+    DiskChecker.checkDirs(finalizedDir);
     DiskChecker.checkDir(tmpDir);
     DiskChecker.checkDir(rbwDir);
   }
     
   void getVolumeMap(ReplicaMap volumeMap) throws IOException {
     // add finalized replicas
-    finalizedDir.getVolumeMap(bpid, volumeMap, volume);
+    addToReplicasMap(volumeMap, finalizedDir, true);
     // add rbw replicas
     addToReplicasMap(volumeMap, rbwDir, false);
   }
 
   /**
+   * Recover an unlinked tmp file on datanode restart. If the original block
+   * does not exist, then the tmp file is renamed to be the
+   * original file name and the original name is returned; otherwise the tmp
+   * file is deleted and null is returned.
+   */
+  File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+    File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+    if (blockFile.exists()) {
+      // If the original block file still exists, then no recovery is needed.
+      if (!unlinkedTmp.delete()) {
+        throw new IOException("Unable to cleanup unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return null;
+    } else {
+      if (!unlinkedTmp.renameTo(blockFile)) {
+        throw new IOException("Unable to rename unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return blockFile;
+    }
+  }
+
+
+  /**
    * Add replicas under the given directory to the volume map
    * @param volumeMap the replicas map
    * @param dir an input directory
@@ -267,23 +300,34 @@ class BlockPoolSlice {
    */
   void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
       ) throws IOException {
-    File blockFiles[] = FileUtil.listFiles(dir);
-    for (File blockFile : blockFiles) {
-      if (!Block.isBlockFilename(blockFile))
+    File files[] = FileUtil.listFiles(dir);
+    for (File file : files) {
+      if (file.isDirectory()) {
+        addToReplicasMap(volumeMap, file, isFinalized);
+      }
+
+      if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+        file = recoverTempUnlinkedBlock(file);
+        if (file == null) { // the original block still exists, so we cover it
+          // in another iteration and can continue here
+          continue;
+        }
+      }
+      if (!Block.isBlockFilename(file))
         continue;
       
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
-          blockFiles, blockFile);
-      long blockId = Block.filename2id(blockFile.getName());
+          files, file);
+      long blockId = Block.filename2id(file.getName());
       ReplicaInfo newReplica = null;
       if (isFinalized) {
         newReplica = new FinalizedReplica(blockId, 
-            blockFile.length(), genStamp, volume, blockFile.getParentFile());
+            file.length(), genStamp, volume, file.getParentFile());
       } else {
 
         boolean loadRwr = true;
-        File restartMeta = new File(blockFile.getParent()  +
-            File.pathSeparator + "." + blockFile.getName() + ".restart");
+        File restartMeta = new File(file.getParent()  +
+            File.pathSeparator + "." + file.getName() + ".restart");
         Scanner sc = null;
         try {
           sc = new Scanner(restartMeta);
@@ -291,8 +335,8 @@ class BlockPoolSlice {
           if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
             // It didn't expire. Load the replica as a RBW.
             newReplica = new ReplicaBeingWritten(blockId,
-                validateIntegrityAndSetLength(blockFile, genStamp), 
-                genStamp, volume, blockFile.getParentFile(), null);
+                validateIntegrityAndSetLength(file, genStamp),
+                genStamp, volume, file.getParentFile(), null);
             loadRwr = false;
           }
           sc.close();
@@ -301,7 +345,7 @@ class BlockPoolSlice {
               restartMeta.getPath());
           }
         } catch (FileNotFoundException fnfe) {
-          // nothing to do here
+          // nothing to do hereFile dir =
         } finally {
           if (sc != null) {
             sc.close();
@@ -310,15 +354,15 @@ class BlockPoolSlice {
         // Restart meta doesn't exist or expired.
         if (loadRwr) {
           newReplica = new ReplicaWaitingToBeRecovered(blockId,
-              validateIntegrityAndSetLength(blockFile, genStamp), 
-              genStamp, volume, blockFile.getParentFile());
+              validateIntegrityAndSetLength(file, genStamp),
+              genStamp, volume, file.getParentFile());
         }
       }
 
       ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
       if (oldReplica != null) {
         FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
-            "on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
+            "on disk: " + oldReplica.getBlockFile() + " and " + file );
       }
     }
   }
@@ -405,10 +449,6 @@ class BlockPoolSlice {
     }
   }
     
-  void clearPath(File f) {
-    finalizedDir.clearPath(f);
-  }
-    
   @Override
   public String toString() {
     return currentDir.getAbsolutePath();

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Aug  5 02:30:54 2014
@@ -1151,7 +1151,7 @@ class FsDatasetImpl implements FsDataset
         return f;
    
       // if file is not null, but doesn't exist - possibly disk failed
-      datanode.checkDiskError();
+      datanode.checkDiskErrorAsync();
     }
     
     if (LOG.isDebugEnabled()) {
@@ -1224,13 +1224,6 @@ class FsDatasetImpl implements FsDataset
               +  ". Parent not found for file " + f);
           continue;
         }
-        ReplicaState replicaState = info.getState();
-        if (replicaState == ReplicaState.FINALIZED || 
-            (replicaState == ReplicaState.RUR && 
-                ((ReplicaUnderRecovery)info).getOriginalReplica().getState() == 
-                  ReplicaState.FINALIZED)) {
-          v.clearPath(bpid, parent);
-        }
         volumeMap.remove(bpid, invalidBlks[i]);
       }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Tue Aug  5 02:30:54 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -235,10 +236,6 @@ class FsVolumeImpl implements FsVolumeSp
     // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     bp.addToReplicasMap(volumeMap, dir, isFinalized);
   }
-  
-  void clearPath(String bpid, File f) throws IOException {
-    getBlockPoolSlice(bpid).clearPath(f);
-  }
 
   @Override
   public String toString() {
@@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSp
     File finalizedDir = new File(bpCurrentDir,
         DataStorage.STORAGE_DIR_FINALIZED);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
-    if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
+    if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
+        finalizedDir)) {
       return false;
     }
     if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
@@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSp
       if (!rbwDir.delete()) {
         throw new IOException("Failed to delete " + rbwDir);
       }
-      if (!finalizedDir.delete()) {
+      if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
+          !FileUtil.fullyDelete(finalizedDir)) {
         throw new IOException("Failed to delete " + finalizedDir);
       }
       FileUtil.fullyDelete(tmpDir);

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Aug  5 02:30:54 2014
@@ -1103,9 +1103,6 @@ public class FSDirectory implements Clos
       count++;
     }
     
-    // update inodeMap
-    removeFromInodeMap(Arrays.asList(allSrcInodes));
-    
     trgInode.setModificationTime(timestamp, trgLatestSnapshot);
     trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug  5 02:30:54 2014
@@ -4585,8 +4585,11 @@ public class FSNamesystem implements Nam
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
           for (int i = 0; i < trimmedTargets.size(); i++) {
-            trimmedTargets.get(i).addBlock(
-              trimmedStorages.get(i), storedBlock);
+            DatanodeStorageInfo storageInfo =
+                trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+            if (storageInfo != null) {
+              storageInfo.addBlock(storedBlock);
+            }
           }
         }
 
@@ -6066,7 +6069,7 @@ public class FSNamesystem implements Nam
   }
 
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final StorageReceivedDeletedBlocks srdb)
+      final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     writeLock();
     try {
@@ -8824,6 +8827,29 @@ public class FSNamesystem implements Nam
     }
   }
 
+  void checkAccess(String src, FsAction mode) throws AccessControlException,
+      FileNotFoundException, UnresolvedLinkException, IOException {
+    checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      if (dir.getINode(src) == null) {
+        throw new FileNotFoundException("Path not found");
+      }
+      if (isPermissionEnabled) {
+        FSPermissionChecker pc = getPermissionChecker();
+        checkPathAccess(pc, src, mode);
+      }
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "checkAccess", src);
+      throw e;
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the



Mime
View raw message