hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1556076 [4/8] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org...
Date Mon, 06 Jan 2014 23:58:36 GMT
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Mon Jan  6 23:58:33 2014
@@ -236,6 +236,8 @@ public abstract class Storage extends St
     final boolean useLock;        // flag to enable storage lock
     final StorageDirType dirType; // storage dir type
     FileLock lock;                // storage lock
+
+    private String storageUuid = null;      // Storage directory identifier.
     
     public StorageDirectory(File dir) {
       // default dirType is null
@@ -246,6 +248,14 @@ public abstract class Storage extends St
       this(dir, dirType, true);
     }
     
+    public void setStorageUuid(String storageUuid) {
+      this.storageUuid = storageUuid;
+    }
+
+    public String getStorageUuid() {
+      return storageUuid;
+    }
+
     /**
      * Constructor
      * @param dir directory corresponding to the storage

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Jan  6 23:58:33 2014
@@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteA
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -159,31 +160,32 @@ class BPOfferService {
   synchronized NamespaceInfo getNamespaceInfo() {
     return bpNSInfo;
   }
-  
+
   @Override
   public synchronized String toString() {
     if (bpNSInfo == null) {
       // If we haven't yet connected to our NN, we don't yet know our
       // own block pool ID.
       // If _none_ of the block pools have connected yet, we don't even
-      // know the storage ID of this DN.
-      String storageId = dn.getStorageId();
-      if (storageId == null || "".equals(storageId)) {
-        storageId = "unknown";
+      // know the DatanodeID ID of this DN.
+      String datanodeUuid = dn.getDatanodeUuid();
+
+      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+        datanodeUuid = "unassigned";
       }
-      return "Block pool <registering> (storage id " + storageId +
-        ")";
+      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
     } else {
       return "Block pool " + getBlockPoolId() +
-        " (storage id " + dn.getStorageId() +
-        ")";
+          " (Datanode Uuid " + dn.getDatanodeUuid() +
+          ")";
     }
   }
   
-  void reportBadBlocks(ExtendedBlock block) {
+  void reportBadBlocks(ExtendedBlock block,
+                       String storageUuid, StorageType storageType) {
     checkBlock(block);
     for (BPServiceActor actor : bpServices) {
-      actor.reportBadBlocks(block);
+      actor.reportBadBlocks(block, storageUuid, storageType);
     }
   }
   
@@ -192,7 +194,8 @@ class BPOfferService {
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+  void notifyNamenodeReceivedBlock(
+      ExtendedBlock block, String delHint, String storageUuid) {
     checkBlock(block);
     checkDelHint(delHint);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@@ -201,7 +204,7 @@ class BPOfferService {
         delHint);
 
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -218,23 +221,23 @@ class BPOfferService {
         "delHint is null");
   }
 
-  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeDeletedBlock(bInfo);
+      actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
     }
   }
   
-  void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -337,7 +340,7 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
+  synchronized DatanodeRegistration createRegistration() throws IOException {
     Preconditions.checkState(bpNSInfo != null,
         "getRegistration() can only be called after initial handshake");
     return dn.createBPRegistration(bpNSInfo);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Mon Jan  6 23:58:33 2014
@@ -22,7 +22,6 @@ import static org.apache.hadoop.util.Tim
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
 
@@ -30,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
@@ -97,9 +96,9 @@ class BPServiceActor implements Runnable
    * keyed by block ID, contains the pending changes which have yet to be
    * reported to the NN. Access should be synchronized on this object.
    */
-  private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR 
-    = Maps.newHashMap();
-  
+  private final Map<String, PerStoragePendingIncrementalBR>
+      pendingIncrementalBRperStorage = Maps.newHashMap();
+
   private volatile int pendingReceivedRequests = 0;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
@@ -241,12 +240,15 @@ class BPServiceActor implements Runnable
     resetBlockReportTime = true; // reset future BRs for randomness
   }
 
-  void reportBadBlocks(ExtendedBlock block) {
+  void reportBadBlocks(ExtendedBlock block,
+      String storageUuid, StorageType storageType) {
     if (bpRegistration == null) {
       return;
     }
     DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
+    String[] uuids = { storageUuid };
+    StorageType[] types = { storageType };
+    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
     
     try {
       bpNamenode.reportBadBlocks(blocks);  
@@ -260,69 +262,120 @@ class BPServiceActor implements Runnable
   }
   
   /**
-   * Report received blocks and delete hints to the Namenode
-   * 
+   * Report received blocks and delete hints to the Namenode for each
+   * storage.
+   *
    * @throws IOException
    */
   private void reportReceivedDeletedBlocks() throws IOException {
 
-    // check if there are newly received blocks
-    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
-    synchronized (pendingIncrementalBR) {
-      int numBlocks = pendingIncrementalBR.size();
-      if (numBlocks > 0) {
-        //
-        // Send newly-received and deleted blockids to namenode
-        //
-        receivedAndDeletedBlockArray = pendingIncrementalBR
-            .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]);
+    // Generate a list of the pending reports for each storage under the lock
+    Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap();
+    synchronized (pendingIncrementalBRperStorage) {
+      for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+           pendingIncrementalBRperStorage.entrySet()) {
+        final String storageUuid = entry.getKey();
+        final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
+
+        if (perStorageMap.getBlockInfoCount() > 0) {
+          // Send newly-received and deleted blockids to namenode
+          ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
+          pendingReceivedRequests =
+              (pendingReceivedRequests > rdbi.length ?
+                  (pendingReceivedRequests - rdbi.length) : 0);
+          blockArrays.put(storageUuid, rdbi);
+        }
       }
-      pendingIncrementalBR.clear();
     }
-    if (receivedAndDeletedBlockArray != null) {
+
+    // Send incremental block reports to the Namenode outside the lock
+    for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry :
+         blockArrays.entrySet()) {
+      final String storageUuid = entry.getKey();
+      final ReceivedDeletedBlockInfo[] rdbi = entry.getValue();
+
       StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
-          bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
+          storageUuid, rdbi) };
       boolean success = false;
       try {
-        bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
-            report);
+        bpNamenode.blockReceivedAndDeleted(bpRegistration,
+            bpos.getBlockPoolId(), report);
         success = true;
       } finally {
-        synchronized (pendingIncrementalBR) {
-          if (!success) {
+        if (!success) {
+          synchronized (pendingIncrementalBRperStorage) {
             // If we didn't succeed in sending the report, put all of the
-            // blocks back onto our queue, but only in the case where we didn't
-            // put something newer in the meantime.
-            for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) {
-              if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
-                pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
-              }
-            }
+            // blocks back onto our queue, but only in the case where we
+            // didn't put something newer in the meantime.
+            PerStoragePendingIncrementalBR perStorageMap =
+                pendingIncrementalBRperStorage.get(storageUuid);
+            pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi);
           }
-          pendingReceivedRequests = pendingIncrementalBR.size();
         }
       }
     }
   }
 
+  /**
+   * Retrieve the incremental BR state for a given storage UUID
+   * @param storageUuid
+   * @return
+   */
+  private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
+      String storageUuid) {
+    PerStoragePendingIncrementalBR mapForStorage =
+        pendingIncrementalBRperStorage.get(storageUuid);
+
+    if (mapForStorage == null) {
+      // This is the first time we are adding incremental BR state for
+      // this storage so create a new map. This is required once per
+      // storage, per service actor.
+      mapForStorage = new PerStoragePendingIncrementalBR();
+      pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
+    }
+
+    return mapForStorage;
+  }
+
+  /**
+   * Add a blockInfo for notification to NameNode. If another entry
+   * exists for the same block it is removed.
+   *
+   * Caller must synchronize access using pendingIncrementalBRperStorage.
+   * @param bInfo
+   * @param storageUuid
+   */
+  void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
+      String storageUuid) {
+    // Make sure another entry for the same block is first removed.
+    // There may only be one such entry.
+    for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+          pendingIncrementalBRperStorage.entrySet()) {
+      if (entry.getValue().removeBlockInfo(bInfo)) {
+        break;
+      }
+    }
+    getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
+  }
+
   /*
    * Informing the name node could take a long long time! Should we wait
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (pendingIncrementalBR) {
-      pendingIncrementalBR.put(
-          bInfo.getBlock().getBlockId(), bInfo);
+  void notifyNamenodeBlockImmediately(
+      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+    synchronized (pendingIncrementalBRperStorage) {
+      addPendingReplicationBlockInfo(bInfo, storageUuid);
       pendingReceivedRequests++;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
     }
   }
 
-  void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) {
-    synchronized (pendingIncrementalBR) {
-      pendingIncrementalBR.put(
-          bInfo.getBlock().getBlockId(), bInfo);
+  void notifyNamenodeDeletedBlock(
+      ReceivedDeletedBlockInfo bInfo, String storageUuid) {
+    synchronized (pendingIncrementalBRperStorage) {
+      addPendingReplicationBlockInfo(bInfo, storageUuid);
     }
   }
 
@@ -331,13 +384,13 @@ class BPServiceActor implements Runnable
    */
   @VisibleForTesting
   void triggerBlockReportForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastBlockReport = 0;
       lastHeartbeat = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
       while (lastBlockReport == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -347,12 +400,12 @@ class BPServiceActor implements Runnable
   
   @VisibleForTesting
   void triggerHeartbeatForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastHeartbeat = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
       while (lastHeartbeat == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -362,13 +415,13 @@ class BPServiceActor implements Runnable
 
   @VisibleForTesting
   void triggerDeletionReportForTests() {
-    synchronized (pendingIncrementalBR) {
+    synchronized (pendingIncrementalBRperStorage) {
       lastDeletedReport = 0;
-      pendingIncrementalBR.notifyAll();
+      pendingIncrementalBRperStorage.notifyAll();
 
       while (lastDeletedReport == 0) {
         try {
-          pendingIncrementalBR.wait(100);
+          pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
           return;
         }
@@ -392,23 +445,38 @@ class BPServiceActor implements Runnable
       // a FINALIZED one.
       reportReceivedDeletedBlocks();
 
+      // Send one block report per known storage.
+
       // Create block report
       long brCreateStartTime = now();
-      BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(
-          bpos.getBlockPoolId());
+      long totalBlockCount = 0;
+
+      Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
+          dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
 
       // Send block report
       long brSendStartTime = now();
-      StorageBlockReport[] report = { new StorageBlockReport(
-          new DatanodeStorage(bpRegistration.getStorageID()),
-          bReport.getBlockListAsLongs()) };
-      cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
+      StorageBlockReport[] reports =
+          new StorageBlockReport[perVolumeBlockLists.size()];
+
+      int i = 0;
+      for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
+        DatanodeStorage dnStorage = kvPair.getKey();
+        BlockListAsLongs blockList = kvPair.getValue();
+        totalBlockCount += blockList.getNumberOfBlocks();
+
+        reports[i++] =
+            new StorageBlockReport(
+              dnStorage, blockList.getBlockListAsLongs());
+      }
+
+      cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
 
       // Log the block report processing stats from Datanode perspective
       long brSendCost = now() - brSendStartTime;
       long brCreateCost = brSendStartTime - brCreateStartTime;
       dn.getMetrics().addBlockReport(brSendCost);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+      LOG.info("BlockReport of " + totalBlockCount
           + " blocks took " + brCreateCost + " msec to generate and "
           + brSendCost + " msecs for RPC and NN processing");
 
@@ -434,17 +502,15 @@ class BPServiceActor implements Runnable
   
   
   HeartbeatResponse sendHeartBeat() throws IOException {
+    StorageReport[] reports =
+        dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Sending heartbeat from service actor: " + this);
+      LOG.debug("Sending heartbeat with " + reports.length +
+                " storage reports from service actor: " + this);
     }
-    // reports number of failed volumes
-    StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
-        false,
-        dn.getFSDataset().getCapacity(),
-        dn.getFSDataset().getDfsUsed(),
-        dn.getFSDataset().getRemaining(),
-        dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
-    return bpNamenode.sendHeartbeat(bpRegistration, report,
+
+    return bpNamenode.sendHeartbeat(bpRegistration,
+        reports,
         dn.getXmitsInProgress(),
         dn.getXceiverCount(),
         dn.getFSDataset().getNumFailedVolumes());
@@ -462,9 +528,9 @@ class BPServiceActor implements Runnable
   }
   
   private String formatThreadName() {
-    Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
-    return "DataNode: [" +
-      StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
+    Collection<StorageLocation> dataDirs =
+        DataNode.getStorageLocations(dn.getConf());
+    return "DataNode: [" + dataDirs.toString() + "] " +
       " heartbeating to " + nnAddr;
   }
   
@@ -570,10 +636,10 @@ class BPServiceActor implements Runnable
         //
         long waitTime = dnConf.heartBeatInterval - 
         (Time.now() - lastHeartbeat);
-        synchronized(pendingIncrementalBR) {
+        synchronized(pendingIncrementalBRperStorage) {
           if (waitTime > 0 && pendingReceivedRequests == 0) {
             try {
-              pendingIncrementalBR.wait(waitTime);
+              pendingIncrementalBRperStorage.wait(waitTime);
             } catch (InterruptedException ie) {
               LOG.warn("BPOfferService for " + this + " interrupted");
             }
@@ -744,4 +810,68 @@ class BPServiceActor implements Runnable
     }
   }
 
+  private static class PerStoragePendingIncrementalBR {
+    private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
+        Maps.newHashMap();
+
+    /**
+     * Return the number of blocks on this storage that have pending
+     * incremental block reports.
+     * @return
+     */
+    int getBlockInfoCount() {
+      return pendingIncrementalBR.size();
+    }
+
+    /**
+     * Dequeue and return all pending incremental block report state.
+     * @return
+     */
+    ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
+      ReceivedDeletedBlockInfo[] blockInfos =
+          pendingIncrementalBR.values().toArray(
+              new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
+
+      pendingIncrementalBR.clear();
+      return blockInfos;
+    }
+
+    /**
+     * Add blocks from blockArray to pendingIncrementalBR, unless the
+     * block already exists in pendingIncrementalBR.
+     * @param blockArray list of blocks to add.
+     * @return the number of missing blocks that we added.
+     */
+    int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
+      int blocksPut = 0;
+      for (ReceivedDeletedBlockInfo rdbi : blockArray) {
+        if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
+          pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
+          ++blocksPut;
+        }
+      }
+      return blocksPut;
+    }
+
+    /**
+     * Add pending incremental block report for a single block.
+     * @param blockID
+     * @param blockInfo
+     */
+    void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+      pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
+    }
+
+    /**
+     * Remove pending incremental block report for a single block if it
+     * exists.
+     *
+     * @param blockInfo
+     * @return true if a report was removed, false if no report existed for
+     *         the given block.
+     */
+    boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
+      return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Mon Jan  6 23:58:33 2014
@@ -187,7 +187,7 @@ class BlockPoolSliceScanner {
         + hours + " hours for block pool " + bpid);
 
     // get the list of blocks and arrange them in random order
-    List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
+    List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
     Collections.shuffle(arr);
     
     long scanTime = -1;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Jan  6 23:58:33 2014
@@ -162,7 +162,8 @@ class BlockReceiver implements Closeable
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
           replicaInfo = datanode.data.createRbw(block);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
           replicaInfo = datanode.data.recoverRbw(
@@ -176,7 +177,8 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@@ -185,7 +187,8 @@ class BlockReceiver implements Closeable
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
-          datanode.notifyNamenodeReceivingBlock(block);
+          datanode.notifyNamenodeReceivingBlock(
+              block, replicaInfo.getStorageUuid());
           break;
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
@@ -252,6 +255,10 @@ class BlockReceiver implements Closeable
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
 
+  String getStorageUuid() {
+    return replicaInfo.getStorageUuid();
+  }
+
   /**
    * close files.
    */
@@ -1027,7 +1034,7 @@ class BlockReceiver implements Closeable
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
               if (ClientTraceLog.isInfoEnabled() && isClient) {
                 long offset = 0;
                 DatanodeRegistration dnR = 
@@ -1035,7 +1042,7 @@ class BlockReceiver implements Closeable
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                       inAddr, myAddr, block.getNumBytes(),
                       "HDFS_WRITE", clientname, offset,
-                      dnR.getStorageID(), block, endTime-startTime));
+                      dnR.getDatanodeUuid(), block, endTime-startTime));
               } else {
                 LOG.info("Received " + block + " size "
                     + block.getNumBytes() + " from " + inAddr);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jan  6 23:58:33 2014
@@ -17,10 +17,40 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
+
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,21 +69,42 @@ import org.apache.hadoop.hdfs.HDFSPolicy
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -62,7 +113,11 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpConfig;
@@ -84,23 +139,21 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import java.io.*;
-import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.management.ObjectName;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -209,7 +262,7 @@ public class DataNode extends Configured
   private JvmPauseMonitor pauseMonitor;
 
   private SecureResources secureResources = null;
-  private AbstractList<File> dataDirs;
+  private List<StorageLocation> dataDirs;
   private Configuration conf;
 
   private final List<String> usersWithLocalPathAccess;
@@ -219,20 +272,11 @@ public class DataNode extends Configured
   private ObjectName dataNodeInfoBeanName;
 
   /**
-   * Create the DataNode given a configuration and an array of dataDirs.
-   * 'dataDirs' is where the blocks are stored.
-   */
-  DataNode(final Configuration conf, 
-           final AbstractList<File> dataDirs) throws IOException {
-    this(conf, dataDirs, null);
-  }
-  
-  /**
    * Create the DataNode given a configuration, an array of dataDirs,
    * and a namenode proxy
    */
-  DataNode(final Configuration conf, 
-           final AbstractList<File> dataDirs,
+  DataNode(final Configuration conf,
+           final List<StorageLocation> dataDirs,
            final SecureResources resources) throws IOException {
     super(conf);
 
@@ -491,7 +535,7 @@ public class DataNode extends Configured
       directoryScanner.start();
     } else {
       LOG.info("Periodic Directory Tree Verification scan is disabled because " +
-               reason);
+                   reason);
     }
   }
   
@@ -563,10 +607,11 @@ public class DataNode extends Configured
   }
   
   // calls specific to BP
-  protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+  protected void notifyNamenodeReceivedBlock(
+      ExtendedBlock block, String delHint, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint); 
+      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
     } else {
       LOG.error("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
@@ -574,10 +619,11 @@ public class DataNode extends Configured
   }
   
   // calls specific to BP
-  protected void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+  protected void notifyNamenodeReceivingBlock(
+      ExtendedBlock block, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
-      bpos.notifyNamenodeReceivingBlock(block); 
+      bpos.notifyNamenodeReceivingBlock(block, storageUuid);
     } else {
       LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
           + block.getBlockPoolId());
@@ -585,10 +631,10 @@ public class DataNode extends Configured
   }
   
   /** Notify the corresponding namenode to delete the block. */
-  public void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+  public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if (bpos != null) {
-      bpos.notifyNamenodeDeletedBlock(block);
+      bpos.notifyNamenodeDeletedBlock(block, storageUuid);
     } else {
       LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
           + block.getBlockPoolId());
@@ -600,7 +646,9 @@ public class DataNode extends Configured
    */
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
     BPOfferService bpos = getBPOSForBlock(block);
-    bpos.reportBadBlocks(block);
+    FsVolumeSpi volume = getFSDataset().getVolume(block);
+    bpos.reportBadBlocks(
+        block, volume.getStorageID(), volume.getStorageType());
   }
 
   /**
@@ -672,7 +720,7 @@ public class DataNode extends Configured
    * @throws IOException
    */
   void startDataNode(Configuration conf, 
-                     AbstractList<File> dataDirs,
+                     List<StorageLocation> dataDirs,
                     // DatanodeProtocol namenode,
                      SecureResources resources
                      ) throws IOException {
@@ -712,19 +760,42 @@ public class DataNode extends Configured
     readaheadPool = ReadaheadPool.getInstance();
   }
   
+  public static String generateUuid() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Verify that the DatanodeUuid has been initialized. If this is a new
+   * datanode then we generate a new Datanode Uuid and persist it to disk.
+   *
+   * @throws IOException
+   */
+  private synchronized void checkDatanodeUuid() throws IOException {
+    if (storage.getDatanodeUuid() == null) {
+      storage.setDatanodeUuid(generateUuid());
+      storage.writeAll();
+      LOG.info("Generated and persisted new Datanode UUID " +
+               storage.getDatanodeUuid());
+    }
+  }
+
   /**
    * Create a DatanodeRegistration for a specific block pool.
    * @param nsInfo the namespace info from the first part of the NN handshake
    */
-  DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
+  DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo)
+      throws IOException {
     StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
     if (storageInfo == null) {
       // it's null in the case of SimulatedDataSet
       storageInfo = new StorageInfo(nsInfo);
     }
+
+    checkDatanodeUuid();
+
     DatanodeID dnId = new DatanodeID(
         streamingAddr.getAddress().getHostAddress(), hostName, 
-        getStorageId(), getXferPort(), getInfoPort(),
+        storage.getDatanodeUuid(), getXferPort(), getInfoPort(),
             infoSecurePort, getIpcPort());
     return new DatanodeRegistration(dnId, storageInfo, 
         new ExportedBlockKeys(), VersionInfo.getVersion());
@@ -743,16 +814,10 @@ public class DataNode extends Configured
       id = bpRegistration;
     }
 
-    if (storage.getStorageID().equals("")) {
-      // This is a fresh datanode, persist the NN-provided storage ID
-      storage.setStorageID(bpRegistration.getStorageID());
-      storage.writeAll();
-      LOG.info("New storage id " + bpRegistration.getStorageID()
-          + " is assigned to data-node " + bpRegistration);
-    } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
-      throw new IOException("Inconsistent storage IDs. Name-node returned "
-          + bpRegistration.getStorageID() 
-          + ". Expecting " + storage.getStorageID());
+    if(!storage.getDatanodeUuid().equals(bpRegistration.getDatanodeUuid())) {
+      throw new IOException("Inconsistent Datanode IDs. Name-node returned "
+          + bpRegistration.getDatanodeUuid()
+          + ". Expecting " + storage.getDatanodeUuid());
     }
     
     registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
@@ -873,7 +938,7 @@ public class DataNode extends Configured
       final StorageInfo bpStorage = storage.getBPStorage(bpid);
       LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
           + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
-          + ";nsInfo=" + nsInfo);
+          + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
     }
 
     synchronized(this)  {
@@ -900,10 +965,6 @@ public class DataNode extends Configured
     return streamingAddr.getPort();
   }
   
-  String getStorageId() {
-    return storage.getStorageID();
-  }
-
   /**
    * @return name useful for logging
    */
@@ -989,34 +1050,6 @@ public class DataNode extends Configured
     return metrics;
   }
   
-  public static void setNewStorageID(DatanodeID dnId) {
-    LOG.info("Datanode is " + dnId);
-    dnId.setStorageID(createNewStorageId(dnId.getXferPort()));
-  }
-  
-  /**
-   * @return a unique storage ID of form "DS-randInt-ipaddr-port-timestamp"
-   */
-  static String createNewStorageId(int port) {
-    // It is unlikely that we will create a non-unique storage ID
-    // for the following reasons:
-    // a) SecureRandom is a cryptographically strong random number generator
-    // b) IP addresses will likely differ on different hosts
-    // c) DataNode xfer ports will differ on the same host
-    // d) StorageIDs will likely be generated at different times (in ms)
-    // A conflict requires that all four conditions are violated.
-    // NB: The format of this string can be changed in the future without
-    // requiring that old SotrageIDs be updated.
-    String ip = "unknownIP";
-    try {
-      ip = DNS.getDefaultIP("default");
-    } catch (UnknownHostException ignored) {
-      LOG.warn("Could not find an IP address for the \"default\" inteface.");
-    }
-    int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
-    return "DS-" + rand + "-" + ip + "-" + port + "-" + Time.now();
-  }
-  
   /** Ensure the authentication method is kerberos */
   private void checkKerberosAuthMethod(String msg) throws IOException {
     // User invoking the call must be same as the datanode user
@@ -1342,8 +1375,10 @@ public class DataNode extends Configured
     // Check if NN recorded length matches on-disk length 
     long onDiskLength = data.getLength(block);
     if (block.getNumBytes() > onDiskLength) {
+      FsVolumeSpi volume = getFSDataset().getVolume(block);
       // Shorter on-disk len indicates corruption so report NN the corrupt block
-      bpos.reportBadBlocks(block);
+      bpos.reportBadBlocks(
+          block, volume.getStorageID(), volume.getStorageType());
       LOG.warn("Can't replicate block " + block
           + " because on-disk length " + onDiskLength 
           + " is shorter than NameNode recorded length " + block.getNumBytes());
@@ -1607,11 +1642,11 @@ public class DataNode extends Configured
    * @param block
    * @param delHint
    */
-  void closeBlock(ExtendedBlock block, String delHint) {
+  void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
     metrics.incrBlocksWritten();
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint);
+      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
     } else {
       LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
@@ -1675,17 +1710,32 @@ public class DataNode extends Configured
       printUsage(System.err);
       return null;
     }
-    Collection<URI> dataDirs = getStorageDirs(conf);
+    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
     UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
         DFS_DATANODE_USER_NAME_KEY);
-    return makeInstance(dataDirs, conf, resources);
+    return makeInstance(dataLocations, conf, resources);
   }
 
-  static Collection<URI> getStorageDirs(Configuration conf) {
-    Collection<String> dirNames =
-      conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
-    return Util.stringCollectionAsURIs(dirNames);
+  public static List<StorageLocation> getStorageLocations(Configuration conf) {
+    Collection<String> rawLocations =
+        conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    List<StorageLocation> locations =
+        new ArrayList<StorageLocation>(rawLocations.size());
+
+    for(String locationString : rawLocations) {
+      final StorageLocation location;
+      try {
+        location = StorageLocation.parse(locationString);
+      } catch (IOException ioe) {
+        throw new IllegalArgumentException("Failed to parse conf property "
+            + DFS_DATANODE_DATA_DIR_KEY + ": " + locationString, ioe);
+      }
+
+      locations.add(location);
+    }
+
+    return locations;
   }
 
   /** Instantiate & Start a single datanode daemon and wait for it to finish.
@@ -1751,57 +1801,52 @@ public class DataNode extends Configured
    * no directory from this directory list can be created.
    * @throws IOException
    */
-  static DataNode makeInstance(Collection<URI> dataDirs, Configuration conf,
-      SecureResources resources) throws IOException {
+  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
+      Configuration conf, SecureResources resources) throws IOException {
     LocalFileSystem localFS = FileSystem.getLocal(conf);
     FsPermission permission = new FsPermission(
         conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                  DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
     DataNodeDiskChecker dataNodeDiskChecker =
         new DataNodeDiskChecker(permission);
-    ArrayList<File> dirs =
-        getDataDirsFromURIs(dataDirs, localFS, dataNodeDiskChecker);
+    List<StorageLocation> locations =
+        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
     DefaultMetricsSystem.initialize("DataNode");
 
-    assert dirs.size() > 0 : "number of data directories should be > 0";
-    return new DataNode(conf, dirs, resources);
+    assert locations.size() > 0 : "number of data directories should be > 0";
+    return new DataNode(conf, locations, resources);
   }
 
   // DataNode ctor expects AbstractList instead of List or Collection...
-  static ArrayList<File> getDataDirsFromURIs(Collection<URI> dataDirs,
+  static List<StorageLocation> checkStorageLocations(
+      Collection<StorageLocation> dataDirs,
       LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
           throws IOException {
-    ArrayList<File> dirs = new ArrayList<File>();
+    ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
     StringBuilder invalidDirs = new StringBuilder();
-    for (URI dirURI : dataDirs) {
-      if (!"file".equalsIgnoreCase(dirURI.getScheme())) {
-        LOG.warn("Unsupported URI schema in " + dirURI + ". Ignoring ...");
-        invalidDirs.append("\"").append(dirURI).append("\" ");
-        continue;
-      }
-      // drop any (illegal) authority in the URI for backwards compatibility
-      File dir = new File(dirURI.getPath());
+    for (StorageLocation location : dataDirs) {
+      final URI uri = location.getUri();
       try {
-        dataNodeDiskChecker.checkDir(localFS, new Path(dir.toURI()));
-        dirs.add(dir);
+        dataNodeDiskChecker.checkDir(localFS, new Path(uri));
+        locations.add(location);
       } catch (IOException ioe) {
         LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
-            + dir + " : ", ioe);
-        invalidDirs.append("\"").append(dirURI.getPath()).append("\" ");
+            + location.getFile() + " : ", ioe);
+        invalidDirs.append("\"").append(uri.getPath()).append("\" ");
       }
     }
-    if (dirs.size() == 0) {
+    if (locations.size() == 0) {
       throw new IOException("All directories in "
           + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
           + invalidDirs);
     }
-    return dirs;
+    return locations;
   }
 
   @Override
   public String toString() {
     return "DataNode{data=" + data + ", localName='" + getDisplayName()
-        + "', storageID='" + getStorageId() + "', xmitsInProgress="
+        + "', datanodeUuid='" + storage.getDatanodeUuid() + "', xmitsInProgress="
         + xmitsInProgress.get() + "}";
   }
 
@@ -1855,7 +1900,6 @@ public class DataNode extends Configured
   }
 
   /**
-   * This method is used for testing. 
    * Examples are adding and deleting blocks directly.
    * The most common usage will be when the data node's storage is simulated.
    * 
@@ -1955,7 +1999,7 @@ public class DataNode extends Configured
     ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
     newBlock.setGenerationStamp(recoveryId);
     newBlock.setNumBytes(newLength);
-    notifyNamenodeReceivedBlock(newBlock, "");
+    notifyNamenodeReceivedBlock(newBlock, "", storageID);
     return storageID;
   }
 
@@ -2415,6 +2459,10 @@ public class DataNode extends Configured
     return dnConf;
   }
 
+  public String getDatanodeUuid() {
+    return id == null ? null : id.getDatanodeUuid();
+  }
+
   boolean shouldRun() {
     return shouldRun;
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Jan  6 23:58:33 2014
@@ -24,13 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -50,6 +44,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -71,8 +66,13 @@ public class DataStorage extends Storage
   public final static String STORAGE_DIR_FINALIZED = "finalized";
   public final static String STORAGE_DIR_TMP = "tmp";
 
-  /** Unique storage ID. {@see DataNode#createNewStorageId(int)} for details */
-  private String storageID;
+  /**
+   * Datanode UUID that this storage is currently attached to. This
+   *  is the same as the legacy StorageID for datanodes that were
+   *  upgraded from a pre-UUID version. For compatibility with prior
+   *  versions of Datanodes we cannot make this field a UUID.
+   */
+  private String datanodeUuid = null;
 
   // Flag to ensure we only initialize storage once
   private boolean initialized = false;
@@ -84,33 +84,29 @@ public class DataStorage extends Storage
 
   DataStorage() {
     super(NodeType.DATA_NODE);
-    storageID = "";
   }
   
   public StorageInfo getBPStorage(String bpid) {
     return bpStorageMap.get(bpid);
   }
   
-  public DataStorage(StorageInfo storageInfo, String strgID) {
+  public DataStorage(StorageInfo storageInfo) {
     super(NodeType.DATA_NODE, storageInfo);
-    this.storageID = strgID;
   }
 
-  /** @return storage ID. */
-  public synchronized String getStorageID() {
-    return storageID;
+  public synchronized String getDatanodeUuid() {
+    return datanodeUuid;
   }
-  
-  synchronized void setStorageID(String newStorageID) {
-    this.storageID = newStorageID;
+
+  public synchronized void setDatanodeUuid(String newDatanodeUuid) {
+    this.datanodeUuid = newDatanodeUuid;
   }
 
   /** Create an ID for this storage. */
-  public synchronized void createStorageID(int datanodePort) {
-    if (storageID != null && !storageID.isEmpty()) {
-      return;
+  public synchronized void createStorageID(StorageDirectory sd) {
+    if (sd.getStorageUuid() == null) {
+      sd.setStorageUuid(DatanodeStorage.generateUuid());
     }
-    storageID = DataNode.createNewStorageId(datanodePort);
   }
   
   /**
@@ -128,7 +124,8 @@ public class DataStorage extends Storage
    * @throws IOException
    */
   synchronized void recoverTransitionRead(DataNode datanode,
-      NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt)
       throws IOException {
     if (initialized) {
       // DN storage has been initialized, no need to do anything
@@ -144,8 +141,8 @@ public class DataStorage extends Storage
     // Format and recover.
     this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
     ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
-    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
-      File dataDir = it.next();
+    for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
+      File dataDir = it.next().getFile();
       StorageDirectory sd = new StorageDirectory(dataDir);
       StorageState curState;
       try {
@@ -162,7 +159,7 @@ public class DataStorage extends Storage
         case NOT_FORMATTED: // format
           LOG.info("Storage directory " + dataDir + " is not formatted");
           LOG.info("Formatting ...");
-          format(sd, nsInfo);
+          format(sd, nsInfo, datanode.getDatanodeUuid());
           break;
         default:  // recovery part is common
           sd.doRecover(curState);
@@ -191,11 +188,9 @@ public class DataStorage extends Storage
       doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
       assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
         "Data-node and name-node layout versions must be the same.";
+      createStorageID(getStorageDir(idx));
     }
     
-    // make sure we have storage id set - if not - generate new one
-    createStorageID(datanode.getXferPort());
-    
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
     
@@ -214,14 +209,14 @@ public class DataStorage extends Storage
    * @throws IOException on error
    */
   void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
-      Collection<File> dataDirs, StartupOption startOpt) throws IOException {
+      Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
     // First ensure datanode level format/snapshot/rollback is completed
     recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
-    
+
     // Create list of storage directories for the block pool
     Collection<File> bpDataDirs = new ArrayList<File>();
-    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
-      File dnRoot = it.next();
+    for(StorageLocation dir : dataDirs) {
+      File dnRoot = dir.getFile();
       File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
           STORAGE_DIR_CURRENT));
       bpDataDirs.add(bpRoot);
@@ -263,19 +258,28 @@ public class DataStorage extends Storage
     }
   }
 
-  void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+  void format(StorageDirectory sd, NamespaceInfo nsInfo,
+              String datanodeUuid) throws IOException {
     sd.clearDirectory(); // create directory
     this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
     this.clusterID = nsInfo.getClusterID();
     this.namespaceID = nsInfo.getNamespaceID();
     this.cTime = 0;
-    // store storageID as it currently is
+    this.datanodeUuid = datanodeUuid;
+
+    if (sd.getStorageUuid() == null) {
+      // Assign a new Storage UUID.
+      sd.setStorageUuid(DatanodeStorage.generateUuid());
+    }
+
     writeProperties(sd);
   }
 
   /*
    * Set ClusterID, StorageID, StorageType, CTime into
-   * DataStorage VERSION file
+   * DataStorage VERSION file.
+   * Always called just before writing the properties to
+   * the VERSION file.
   */
   @Override
   protected void setPropertiesFromFields(Properties props, 
@@ -285,7 +289,13 @@ public class DataStorage extends Storage
     props.setProperty("clusterID", clusterID);
     props.setProperty("cTime", String.valueOf(cTime));
     props.setProperty("layoutVersion", String.valueOf(layoutVersion));
-    props.setProperty("storageID", getStorageID());
+    props.setProperty("storageID", sd.getStorageUuid());
+
+    String datanodeUuid = getDatanodeUuid();
+    if (datanodeUuid != null) {
+      props.setProperty("datanodeUuid", datanodeUuid);
+    }
+
     // Set NamespaceID in version before federation
     if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
       props.setProperty("namespaceID", String.valueOf(namespaceID));
@@ -295,6 +305,7 @@ public class DataStorage extends Storage
   /*
    * Read ClusterID, StorageID, StorageType, CTime from 
    * DataStorage VERSION file and verify them.
+   * Always called just after reading the properties from the VERSION file.
    */
   @Override
   protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
@@ -318,20 +329,36 @@ public class DataStorage extends Storage
       setNamespaceID(props, sd);
     }
     
+
     // valid storage id, storage id may be empty
     String ssid = props.getProperty("storageID");
     if (ssid == null) {
       throw new InconsistentFSStateException(sd.getRoot(), "file "
           + STORAGE_FILE_VERSION + " is invalid.");
     }
-    String sid = getStorageID();
-    if (!(sid.equals("") || ssid.equals("") || sid.equals(ssid))) {
+    String sid = sd.getStorageUuid();
+    if (!(sid == null || sid.equals("") ||
+          ssid.equals("") || sid.equals(ssid))) {
       throw new InconsistentFSStateException(sd.getRoot(),
           "has incompatible storage Id.");
     }
-    
-    if (sid.equals("")) { // update id only if it was empty
-      setStorageID(ssid);
+
+    if (sid == null) { // update id only if it was null
+      sd.setStorageUuid(ssid);
+    }
+
+    // Update the datanode UUID if present.
+    if (props.getProperty("datanodeUuid") != null) {
+      String dnUuid = props.getProperty("datanodeUuid");
+
+      if (getDatanodeUuid() == null) {
+        setDatanodeUuid(dnUuid);
+      } else if (getDatanodeUuid().compareTo(dnUuid) != 0) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "Root " + sd.getRoot() + ": DatanodeUuid=" + dnUuid +
+            ", does not match " + getDatanodeUuid() + " from other" +
+            " StorageDirectory.");
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Jan  6 23:58:33 2014
@@ -284,7 +284,7 @@ class DataXceiver extends Receiver imple
         BlockSender.ClientTraceLog.info(String.format(
             "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
             " blockid: %s, srvID: %s, success: %b",
-            blk.getBlockId(), dnR.getStorageID(), (fis != null)
+            blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
           ));
       }
       if (fis != null) {
@@ -317,7 +317,7 @@ class DataXceiver extends Receiver imple
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
             "%d", "HDFS_READ", clientName, "%d",
-            dnR.getStorageID(), block, "%d")
+            dnR.getDatanodeUuid(), block, "%d")
         : dnR + " Served block " + block + " to " +
             remoteAddress;
 
@@ -447,6 +447,7 @@ class DataXceiver extends Receiver imple
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
+    final String storageUuid;
     try {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -457,8 +458,10 @@ class DataXceiver extends Receiver imple
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             cachingStrategy);
+        storageUuid = blockReceiver.getStorageUuid();
       } else {
-        datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
+        storageUuid = datanode.data.recoverClose(
+            block, latestGenerationStamp, minBytesRcvd);
       }
 
       //
@@ -590,7 +593,7 @@ class DataXceiver extends Receiver imple
       // the block is finalized in the PacketResponder.
       if (isDatanode ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);
         LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
             + localAddress + " of size " + block.getNumBytes());
       }
@@ -859,9 +862,11 @@ class DataXceiver extends Receiver imple
           dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
-      datanode.notifyNamenodeReceivedBlock(block, delHint);
+      datanode.notifyNamenodeReceivedBlock(
+          block, delHint, blockReceiver.getStorageUuid());
 
-      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
+      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+          + ", delHint=" + delHint);
       
     } catch (IOException ioe) {
       opStatus = ERROR;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Mon Jan  6 23:58:33 2014
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -230,10 +229,6 @@ public class DirectoryScanner implements
       throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
     }
 
-    ScanInfo(long blockId) {
-      this(blockId, null, null, null);
-    }
-
     ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
       this.blockId = blockId;
       String condensedVolPath = vol == null ? null :
@@ -439,8 +434,8 @@ public class DirectoryScanner implements
         diffs.put(bpid, diffRecord);
         
         statsRecord.totalBlocks = blockpoolReport.length;
-        List<Block> bl = dataset.getFinalizedBlocks(bpid);
-        Block[] memReport = bl.toArray(new Block[bl.size()]);
+        List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
+        FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
         Arrays.sort(memReport); // Sort based on blockId
   
         int d = 0; // index for blockpoolReport
@@ -458,7 +453,8 @@ public class DirectoryScanner implements
           }
           if (info.getBlockId() > memBlock.getBlockId()) {
             // Block is missing on the disk
-            addDifference(diffRecord, statsRecord, memBlock.getBlockId());
+            addDifference(diffRecord, statsRecord,
+                          memBlock.getBlockId(), info.getVolume());
             m++;
             continue;
           }
@@ -478,7 +474,9 @@ public class DirectoryScanner implements
           m++;
         }
         while (m < memReport.length) {
-          addDifference(diffRecord, statsRecord, memReport[m++].getBlockId());
+          FinalizedReplica current = memReport[m++];
+          addDifference(diffRecord, statsRecord,
+                        current.getBlockId(), current.getVolume());
         }
         while (d < blockpoolReport.length) {
           statsRecord.missingMemoryBlocks++;
@@ -502,10 +500,11 @@ public class DirectoryScanner implements
 
   /** Block is not found on the disk */
   private void addDifference(LinkedList<ScanInfo> diffRecord,
-                             Stats statsRecord, long blockId) {
+                             Stats statsRecord, long blockId,
+                             FsVolumeSpi vol) {
     statsRecord.missingBlockFile++;
     statsRecord.missingMetaFile++;
-    diffRecord.add(new ScanInfo(blockId));
+    diffRecord.add(new ScanInfo(blockId, null, null, vol));
   }
 
   /** Is the given volume still valid in the dataset? */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java Mon Jan  6 23:58:33 2014
@@ -54,4 +54,9 @@ public interface Replica {
    * @return the number of bytes that are visible to readers
    */
   public long getVisibleLength();
+
+  /**
+   * Return the storageUuid of the volume that stores this replica.
+   */
+  public String getStorageUuid();
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Mon Jan  6 23:58:33 2014
@@ -137,6 +137,14 @@ abstract public class ReplicaInfo extend
   void setVolume(FsVolumeSpi vol) {
     this.volume = vol;
   }
+
+  /**
+   * Get the storageUuid of the volume that stores this replica.
+   */
+  @Override
+  public String getStorageUuid() {
+    return volume.getStorageID();
+  }
   
   /**
    * Return the parent directory path where this replica is located

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1556076&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Mon Jan  6 23:58:33 2014
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.regex.Pattern;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.common.Util;
+
+/**
+ * Encapsulates the URI and storage medium that together describe a
+ * storage directory.
+ * The default storage medium is assumed to be DISK, if none is specified.
+ *
+ */
+@InterfaceAudience.Private
+public class StorageLocation {
+  final StorageType storageType;
+  final File file;
+
+  /** Regular expression that describes a storage uri with a storage type.
+   *  e.g. [Disk]/storages/storage1/
+   */
+  private static final Pattern regex = Pattern.compile("^\\[(\\w*)\\](.+)$");
+
+  private StorageLocation(StorageType storageType, URI uri) {
+    this.storageType = storageType;
+
+    if (uri.getScheme() == null ||
+        "file".equalsIgnoreCase(uri.getScheme())) {
+      // drop any (illegal) authority in the URI for backwards compatibility
+      this.file = new File(uri.getPath());
+    } else {
+      throw new IllegalArgumentException("Unsupported URI schema in " + uri);
+    }
+  }
+
+  public StorageType getStorageType() {
+    return this.storageType;
+  }
+
+  URI getUri() {
+    return file.toURI();
+  }
+
+  public File getFile() {
+    return this.file;
+  }
+
+  /**
+   * Attempt to parse a storage uri with storage class and URI. The storage
+   * class component of the uri is case-insensitive.
+   *
+   * @param rawLocation Location string of the format [type]uri, where [type] is
+   *                    optional.
+   * @return A StorageLocation object if successfully parsed, null otherwise.
+   *         Does not throw any exceptions.
+   */
+  static StorageLocation parse(String rawLocation) throws IOException {
+    Matcher matcher = regex.matcher(rawLocation);
+    StorageType storageType = StorageType.DEFAULT;
+    String location = rawLocation;
+
+    if (matcher.matches()) {
+      String classString = matcher.group(1);
+      location = matcher.group(2);
+      if (!classString.isEmpty()) {
+        storageType = StorageType.valueOf(classString.toUpperCase());
+      }
+    }
+
+    return new StorageLocation(storageType, Util.stringAsURI(location));
+  }
+
+  @Override
+  public String toString() {
+    return "[" + storageType + "]" + file.toURI();
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Mon Jan  6 23:58:33 2014
@@ -34,12 +34,15 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -86,17 +89,18 @@ public interface FsDatasetSpi<V extends 
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
+  /** @return one or more storage reports for attached volumes. */
+  public StorageReport[] getStorageReports(String bpid)
+      throws IOException;
+
   /** @return the volume that contains a replica of the block. */
   public V getVolume(ExtendedBlock b);
 
   /** @return a volume information map (name => info). */
   public Map<String, Object> getVolumeInfoMap();
 
-  /** @return a list of block pools. */
-  public String[] getBlockPoolList();
-
   /** @return a list of finalized blocks for the given block pool. */
-  public List<Block> getFinalizedBlocks(String bpid);
+  public List<FinalizedReplica> getFinalizedBlocks(String bpid);
 
   /**
    * Check whether the in-memory block record matches the block on the disk,
@@ -239,9 +243,10 @@ public interface FsDatasetSpi<V extends 
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @return the storage uuid of the replica.
    * @throws IOException
    */
-  public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
+  public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
       ) throws IOException;
   
   /**
@@ -262,12 +267,11 @@ public interface FsDatasetSpi<V extends 
   public void unfinalizeBlock(ExtendedBlock b) throws IOException;
 
   /**
-   * Returns the block report - the full list of blocks stored under a 
-   * block pool
+   * Returns one block report per volume.
    * @param bpid Block Pool Id
-   * @return - the block report - the full list of blocks stored
+   * @return - a map of DatanodeStorage to block report for the volume.
    */
-  public BlockListAsLongs getBlockReport(String bpid);
+  public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid);
 
   /** Does the dataset contain the block? */
   public boolean contains(ExtendedBlock block);
@@ -386,3 +390,4 @@ public interface FsDatasetSpi<V extends 
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException;
 }
+

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java Mon Jan  6 23:58:33 2014
@@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.StorageType;
+
 /**
  * This is an interface for the underlying volume.
  */
 public interface FsVolumeSpi {
+  /** @return the StorageUuid of the volume */
+  public String getStorageID();
+
   /** @return a list of block pools. */
   public String[] getBlockPoolList();
 
@@ -38,4 +43,6 @@ public interface FsVolumeSpi {
 
   /** @return the directory for the finalized blocks in the block pool. */
   public File getFinalizedDir(String bpid) throws IOException;
+  
+  public StorageType getStorageType();
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1556076&r1=1556075&r2=1556076&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Mon Jan  6 23:58:33 2014
@@ -195,7 +195,7 @@ class FsDatasetAsyncDiskService {
             + " at file " + blockFile + ". Ignored.");
       } else {
         if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
-          datanode.notifyNamenodeDeletedBlock(block);
+          datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
         }
         volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
         LOG.info("Deleted " + block.getBlockPoolId() + " "



Mime
View raw message