hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1563254 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/or...
Date Fri, 31 Jan 2014 21:00:33 GMT
Author: arp
Date: Fri Jan 31 21:00:33 2014
New Revision: 1563254

URL: http://svn.apache.org/r1563254
Log:
HDFS-5153. Datanode should send block reports for each storage in a separate message. (Arpit Agarwal)

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jan 31 21:00:33 2014
@@ -297,6 +297,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and 
     the corresponding byte value. (jing9)
 
+    HDFS-5153. Datanode should send block reports for each storage in a
+    separate message. (Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 31 21:00:33 2014
@@ -399,6 +399,8 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000;
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
+  public static final String  DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold";
+  public static final long    DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000;
   public static final String  DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
   public static final long    DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
   public static final String  DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jan 31 21:00:33 2014
@@ -1616,15 +1616,19 @@ public class BlockManager {
   /**
    * The given storage is reporting all its blocks.
    * Update the (storage-->block list) and (block-->storage list) maps.
+   *
+   * @return true if all known storages of the given DN have finished reporting.
+   * @throws IOException
    */
-  public void processReport(final DatanodeID nodeID,
+  public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage, final String poolId,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
     final long endTime;
+    DatanodeDescriptor node;
     try {
-      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      node = datanodeManager.getDatanode(nodeID);
       if (node == null || !node.isAlive) {
         throw new IOException(
             "ProcessReport from dead or unregistered node: " + nodeID);
@@ -1632,13 +1636,21 @@ public class BlockManager {
 
       // To minimize startup time, we discard any second (or later) block reports
       // that we receive while still in startup phase.
-      final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
+      DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
+
+      if (storageInfo == null) {
+        // We handle this for backwards compatibility.
+        storageInfo = node.updateStorage(storage);
+        LOG.warn("Unknown storageId " + storage.getStorageID() +
+                    ", updating storageMap. This indicates a buggy " +
+                    "DataNode that isn't heartbeating correctly.");
+      }
       if (namesystem.isInStartupSafeMode()
           && storageInfo.getBlockReportCount() > 0) {
         blockLog.info("BLOCK* processReport: "
             + "discarded non-initial block report from " + nodeID
             + " because namenode still in startup phase");
-        return;
+        return !node.hasStaleStorages();
       }
 
       if (storageInfo.numBlocks() == 0) {
@@ -1655,7 +1667,7 @@ public class BlockManager {
       storageInfo.receivedBlockReport();
       if (staleBefore && !storageInfo.areBlockContentsStale()) {
         LOG.info("BLOCK* processReport: Received first block report from "
-            + node + " after starting up or becoming active. Its block "
+            + storage + " after starting up or becoming active. Its block "
             + "contents are no longer considered stale");
         rescanPostponedMisreplicatedBlocks();
       }
@@ -1670,9 +1682,10 @@ public class BlockManager {
     if (metrics != null) {
       metrics.addBlockReport((int) (endTime - startTime));
     }
-    blockLog.info("BLOCK* processReport: from "
-        + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+    blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID()
+        + " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
         + ", processing time: " + (endTime - startTime) + " msecs");
+    return !node.hasStaleStorages();
   }
 
   /**
@@ -1827,7 +1840,7 @@ public class BlockManager {
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    final DatanodeStorageInfo storageInfo = dn.updateStorage(storage);
+    final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
 
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Jan 31 21:00:33 2014
@@ -257,6 +257,17 @@ public class DatanodeDescriptor extends 
     }
   }
 
+  boolean hasStaleStorages() {
+    synchronized (storageMap) {
+      for (DatanodeStorageInfo storage : storageMap.values()) {
+        if (storage.areBlockContentsStale()) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
   /**
    * Remove block from the list of blocks belonging to the data-node. Remove
    * data-node from the block.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Jan 31 21:00:33 2014
@@ -22,11 +22,9 @@ import static org.apache.hadoop.util.Tim
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -437,75 +435,100 @@ class BPServiceActor implements Runnable
 
   /**
    * Report the list blocks to the Namenode
+   * @return DatanodeCommands returned by the NN. May be null.
    * @throws IOException
    */
-  DatanodeCommand blockReport() throws IOException {
+  List<DatanodeCommand> blockReport() throws IOException {
     // send block report if timer has expired.
-    DatanodeCommand cmd = null;
-    long startTime = now();
-    if (startTime - lastBlockReport > dnConf.blockReportInterval) {
+    final long startTime = now();
+    if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
+      return null;
+    }
+
+    ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
 
-      // Flush any block information that precedes the block report. Otherwise
-      // we have a chance that we will miss the delHint information
-      // or we will report an RBW replica after the BlockReport already reports
-      // a FINALIZED one.
-      reportReceivedDeletedBlocks();
-
-      // Send one block report per known storage.
-
-      // Create block report
-      long brCreateStartTime = now();
-      long totalBlockCount = 0;
-
-      Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
-          dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
-
-      // Send block report
-      long brSendStartTime = now();
-      StorageBlockReport[] reports =
-          new StorageBlockReport[perVolumeBlockLists.size()];
-
-      int i = 0;
-      for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
-        DatanodeStorage dnStorage = kvPair.getKey();
-        BlockListAsLongs blockList = kvPair.getValue();
-        totalBlockCount += blockList.getNumberOfBlocks();
-
-        reports[i++] =
-            new StorageBlockReport(
-              dnStorage, blockList.getBlockListAsLongs());
-      }
-
-      cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
-
-      // Log the block report processing stats from Datanode perspective
-      long brSendCost = now() - brSendStartTime;
-      long brCreateCost = brSendStartTime - brCreateStartTime;
-      dn.getMetrics().addBlockReport(brSendCost);
-      LOG.info("BlockReport of " + totalBlockCount
-          + " blocks took " + brCreateCost + " msec to generate and "
-          + brSendCost + " msecs for RPC and NN processing");
-
-      // If we have sent the first block report, then wait a random
-      // time before we start the periodic block reports.
-      if (resetBlockReportTime) {
-        lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
-        resetBlockReportTime = false;
-      } else {
-        /* say the last block report was at 8:20:14. The current report
-         * should have started around 9:20:14 (default 1 hour interval).
-         * If current time is :
-         *   1) normal like 9:20:18, next report should be at 10:20:14
-         *   2) unexpected like 11:35:43, next report should be at 12:20:14
-         */
-        lastBlockReport += (now() - lastBlockReport) /
-        dnConf.blockReportInterval * dnConf.blockReportInterval;
+    // Flush any block information that precedes the block report. Otherwise
+    // we have a chance that we will miss the delHint information
+    // or we will report an RBW replica after the BlockReport already reports
+    // a FINALIZED one.
+    reportReceivedDeletedBlocks();
+    lastDeletedReport = startTime;
+
+    long brCreateStartTime = now();
+    Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
+        dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
+
+    // Convert the reports to the format expected by the NN.
+    int i = 0;
+    int totalBlockCount = 0;
+    StorageBlockReport reports[] =
+        new StorageBlockReport[perVolumeBlockLists.size()];
+
+    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
+      BlockListAsLongs blockList = kvPair.getValue();
+      reports[i++] = new StorageBlockReport(
+          kvPair.getKey(), blockList.getBlockListAsLongs());
+      totalBlockCount += blockList.getNumberOfBlocks();
+    }
+
+    // Send the reports to the NN.
+    int numReportsSent;
+    long brSendStartTime = now();
+    if (totalBlockCount < dnConf.blockReportSplitThreshold) {
+      // Below split threshold, send all reports in a single message.
+      numReportsSent = 1;
+      DatanodeCommand cmd =
+          bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
+      if (cmd != null) {
+        cmds.add(cmd);
+      }
+    } else {
+      // Send one block report per message.
+      numReportsSent = i;
+      for (StorageBlockReport report : reports) {
+        StorageBlockReport singleReport[] = { report };
+        DatanodeCommand cmd = bpNamenode.blockReport(
+            bpRegistration, bpos.getBlockPoolId(), singleReport);
+        if (cmd != null) {
+          cmds.add(cmd);
+        }
       }
-      LOG.info("sent block report, processed command:" + cmd);
     }
-    return cmd;
+
+    // Log the block report processing stats from Datanode perspective
+    long brSendCost = now() - brSendStartTime;
+    long brCreateCost = brSendStartTime - brCreateStartTime;
+    dn.getMetrics().addBlockReport(brSendCost);
+    LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
+        " blocks total. Took " + brCreateCost +
+        " msec to generate and " + brSendCost +
+        " msecs for RPC and NN processing. " +
+        " Got back commands " +
+            (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));
+
+    scheduleNextBlockReport(startTime);
+    return cmds.size() == 0 ? null : cmds;
+  }
+
+  private void scheduleNextBlockReport(long previousReportStartTime) {
+    // If we have sent the first set of block reports, then wait a random
+    // time before we start the periodic block reports.
+    if (resetBlockReportTime) {
+      lastBlockReport = previousReportStartTime -
+          DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
+      resetBlockReportTime = false;
+    } else {
+      /* say the last block report was at 8:20:14. The current report
+       * should have started around 9:20:14 (default 1 hour interval).
+       * If current time is :
+       *   1) normal like 9:20:18, next report should be at 10:20:14
+       *   2) unexpected like 11:35:43, next report should be at 12:20:14
+       */
+      lastBlockReport += (now() - lastBlockReport) /
+          dnConf.blockReportInterval * dnConf.blockReportInterval;
+    }
   }
-  
+
   DatanodeCommand cacheReport() throws IOException {
     // If caching is disabled, do not send a cache report
     if (dn.getFSDataset().getCacheCapacity() == 0) {
@@ -513,7 +536,7 @@ class BPServiceActor implements Runnable
     }
     // send cache report if timer has expired.
     DatanodeCommand cmd = null;
-    long startTime = Time.monotonicNow();
+    final long startTime = Time.monotonicNow();
     if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending cacheReport from service actor: " + this);
@@ -613,7 +636,7 @@ class BPServiceActor implements Runnable
     //
     while (shouldRun()) {
       try {
-        long startTime = now();
+        final long startTime = now();
 
         //
         // Every so often, send heartbeat or block-report
@@ -659,10 +682,10 @@ class BPServiceActor implements Runnable
           lastDeletedReport = startTime;
         }
 
-        DatanodeCommand cmd = blockReport();
-        processCommand(new DatanodeCommand[]{ cmd });
+        List<DatanodeCommand> cmds = blockReport();
+        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
 
-        cmd = cacheReport();
+        DatanodeCommand cmd = cacheReport();
         processCommand(new DatanodeCommand[]{ cmd });
 
         // Now safe to start scanning the block pool.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Jan 31 21:00:33 2014
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@@ -70,6 +72,7 @@ public class DNConf {
   final long readaheadLength;
   final long heartBeatInterval;
   final long blockReportInterval;
+  final long blockReportSplitThreshold;
   final long deleteReportInterval;
   final long initialBlockReportDelay;
   final long cacheReportInterval;
@@ -117,6 +120,8 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
+                                            DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
     this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
         DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Jan 31 21:00:33 2014
@@ -982,13 +982,18 @@ class NameNodeRpcServer implements Namen
            + "from " + nodeReg + ", reports.length=" + reports.length);
     }
     final BlockManager bm = namesystem.getBlockManager(); 
+    boolean hasStaleStorages = true;
     for(StorageBlockReport r : reports) {
       final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
-      bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
+      hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
     }
 
-    if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
+    if (nn.getFSImage().isUpgradeFinalized() &&
+        !nn.isStandbyState() &&
+        !hasStaleStorages) {
       return new FinalizeCommand(poolId);
+    }
+
     return null;
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1563254&r1=1563253&r2=1563254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jan 31 21:00:33 2014
@@ -483,6 +483,20 @@
 </property>
 
 <property>
+    <name>dfs.blockreport.split.threshold</name>
+    <value>1000000</value>
+    <description>If the number of blocks on the DataNode is below this
+    threshold then it will send block reports for all Storage Directories
+    in a single message.
+
+    If the number of blocks exceeds this threshold then the DataNode will
+    send block reports for each Storage Directory in separate messages.
+
+    Set to zero to always split.
+    </description>
+</property>
+
+<property>
   <name>dfs.datanode.directoryscan.interval</name>
   <value>21600</value>
   <description>Interval in seconds for Datanode to scan data directories and

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java?rev=1563254&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java Fri Jan 31 21:00:33 2014
@@ -0,0 +1,851 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+/**
+ * This is the base class for simulating a variety of situations
+ * when blocks are being intentionally corrupted, unexpectedly modified,
+ * and so on before a block report is happening.
+ *
+ * By overriding {@link #sendBlockReports}, derived classes can test
+ * different variations of how block reports are split across storages
+ * and messages.
+ */
+public abstract class BlockReportTestBase {
+  public static final Log LOG = LogFactory.getLog(BlockReportTestBase.class);
+
+  private static short REPL_FACTOR = 1;
+  private static final int RAND_LIMIT = 2000;
+  private static final long DN_RESCAN_INTERVAL = 5000;
+  private static final long DN_RESCAN_EXTRA_WAIT = 2 * DN_RESCAN_INTERVAL;
+  private static final int DN_N0 = 0;
+  private static final int FILE_START = 0;
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final int NUM_BLOCKS = 10;
+  private static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
+
+  protected MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+
+  private static Random rand = new Random(RAND_LIMIT);
+
+  private static Configuration conf;
+
+  static {
+    initLoggers();
+    resetConfiguration();
+  }
+
+  @Before
+  public void startUpCluster() throws IOException {
+    REPL_FACTOR = 1; //Reset if case a test has modified the value
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    fs.close();
+    cluster.shutdownDataNodes();
+    cluster.shutdown();
+  }
+
+  protected static void resetConfiguration() {
+    conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL);
+  }
+
+  // Generate a block report, optionally corrupting the generation
+  // stamp and/or length of one block.
+  private static StorageBlockReport[] getBlockReports(
+      DataNode dn, String bpid, boolean corruptOneBlockGs,
+      boolean corruptOneBlockLen) {
+    Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
+        dn.getFSDataset().getBlockReports(bpid);
+
+    // Send block report
+    StorageBlockReport[] reports =
+        new StorageBlockReport[perVolumeBlockLists.size()];
+    boolean corruptedGs = false;
+    boolean corruptedLen = false;
+
+    int reportIndex = 0;
+    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
+      DatanodeStorage dnStorage = kvPair.getKey();
+      BlockListAsLongs blockList = kvPair.getValue();
+
+      // Walk the list of blocks until we find one each to corrupt the
+      // generation stamp and length, if so requested.
+      for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) {
+        if (corruptOneBlockGs && !corruptedGs) {
+          blockList.corruptBlockGSForTesting(i, rand);
+          LOG.info("Corrupted the GS for block ID " + i);
+          corruptedGs = true;
+        } else if (corruptOneBlockLen && !corruptedLen) {
+          blockList.corruptBlockLengthForTesting(i, rand);
+          LOG.info("Corrupted the length for block ID " + i);
+          corruptedLen = true;
+        } else {
+          break;
+        }
+      }
+
+      reports[reportIndex++] =
+          new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
+    }
+
+    return reports;
+  }
+
+  /**
+   * Utility routine to send block reports to the NN, either in a single call
+   * or reporting one storage per call.
+   *
+   * @param dnR
+   * @param poolId
+   * @param reports
+   * @throws IOException
+   */
+  protected abstract void sendBlockReports(DatanodeRegistration dnR, String poolId,
+      StorageBlockReport[] reports) throws IOException;
+
+  /**
+   * Test write a file, verifies and closes it. Then the length of the blocks
+   * are messed up and BlockReport is forced.
+   * The modification of blocks' length has to be ignored
+   *
+   * @throws java.io.IOException on an error
+   */
+  @Test(timeout=300000)
+  public void blockReport_01() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+
+    ArrayList<Block> blocks = prepareForRide(filePath, METHOD_NAME, FILE_SIZE);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Number of blocks allocated " + blocks.size());
+    }
+    long[] oldLengths = new long[blocks.size()];
+    int tempLen;
+    for (int i = 0; i < blocks.size(); i++) {
+      Block b = blocks.get(i);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Block " + b.getBlockName() + " before\t" + "Size " +
+            b.getNumBytes());
+      }
+      oldLengths[i] = b.getNumBytes();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Setting new length");
+      }
+      tempLen = rand.nextInt(BLOCK_SIZE);
+      b.set(b.getBlockId(), tempLen, b.getGenerationStamp());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Block " + b.getBlockName() + " after\t " + "Size " +
+            b.getNumBytes());
+      }
+    }
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
+    sendBlockReports(dnR, poolId, reports);
+
+    List<LocatedBlock> blocksAfterReport =
+      DFSTestUtil.getAllBlocks(fs.open(filePath));
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("After mods: Number of blocks allocated " +
+          blocksAfterReport.size());
+    }
+
+    for (int i = 0; i < blocksAfterReport.size(); i++) {
+      ExtendedBlock b = blocksAfterReport.get(i).getBlock();
+      assertEquals("Length of " + i + "th block is incorrect",
+        oldLengths[i], b.getNumBytes());
+    }
+  }
+
+  /**
+   * Test write a file, verifies and closes it. Then a couple of random blocks
+   * is removed and BlockReport is forced; the FSNamesystem is pushed to
+   * recalculate required DN's activities such as replications and so on.
+   * The number of missing and under-replicated blocks should be the same in
+   * case of a single-DN cluster.
+   *
+   * @throws IOException in case of errors
+   */
+  @Test(timeout=300000)
+  public void blockReport_02() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    LOG.info("Running test " + METHOD_NAME);
+
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    DFSTestUtil.createFile(fs, filePath,
+      FILE_SIZE, REPL_FACTOR, rand.nextLong());
+
+    // mock around with newly created blocks and delete some
+    File dataDir = new File(cluster.getDataDirectory());
+    assertTrue(dataDir.isDirectory());
+
+    List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
+    List<Integer> removedIndex = new ArrayList<Integer>();
+    List<LocatedBlock> lBlocks =
+      cluster.getNameNodeRpc().getBlockLocations(
+          filePath.toString(), FILE_START,
+          FILE_SIZE).getLocatedBlocks();
+
+    while (removedIndex.size() != 2) {
+      int newRemoveIndex = rand.nextInt(lBlocks.size());
+      if (!removedIndex.contains(newRemoveIndex))
+        removedIndex.add(newRemoveIndex);
+    }
+
+    for (Integer aRemovedIndex : removedIndex) {
+      blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
+    }
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Number of blocks allocated " + lBlocks.size());
+    }
+
+    final DataNode dn0 = cluster.getDataNodes().get(DN_N0);
+    for (ExtendedBlock b : blocks2Remove) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Removing the block " + b.getBlockName());
+      }
+      for (File f : findAllFiles(dataDir,
+        new MyFileFilter(b.getBlockName(), true))) {
+        DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
+        if (!f.delete()) {
+          LOG.warn("Couldn't delete " + b.getBlockName());
+        } else {
+          LOG.debug("Deleted file " + f.toString());
+        }
+      }
+    }
+
+    waitTil(DN_RESCAN_EXTRA_WAIT);
+
+    // all blocks belong to the same file, hence same BP
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
+    StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
+    sendBlockReports(dnR, poolId, reports);
+
+    BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
+        .getBlockManager());
+
+    printStats();
+
+    assertEquals("Wrong number of MissingBlocks is found",
+      blocks2Remove.size(), cluster.getNamesystem().getMissingBlocksCount());
+    assertEquals("Wrong number of UnderReplicatedBlocks is found",
+      blocks2Remove.size(), cluster.getNamesystem().getUnderReplicatedBlocks());
+  }
+
+
+  /**
+   * Test writes a file and closes it.
+   * Block reported is generated with a bad GS for a single block.
+   * Block report is forced and the check for # of corrupted blocks is performed.
+   *
+   * @throws IOException in case of an error
+   */
+  @Test(timeout=300000)
+  public void blockReport_03() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
+
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
+    sendBlockReports(dnR, poolId, reports);
+    printStats();
+
+    assertThat("Wrong number of corrupt blocks",
+               cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
+    assertThat("Wrong number of PendingDeletion blocks",
+               cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
+  }
+
+  /**
+   * Test writes a file and closes it.
+   * Block reported is generated with an extra block.
+   * Block report is forced and the check for # of pendingdeletion
+   * blocks is performed.
+   *
+   * @throws IOException in case of an error
+   */
+  @Test(timeout=300000)
+  public void blockReport_04() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    DFSTestUtil.createFile(fs, filePath,
+                           FILE_SIZE, REPL_FACTOR, rand.nextLong());
+
+
+    DataNode dn = cluster.getDataNodes().get(DN_N0);
+    // all blocks belong to the same file, hence same BP
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+
+    // Create a bogus new block which will not be present on the namenode.
+    ExtendedBlock b = new ExtendedBlock(
+        poolId, rand.nextLong(), 1024L, rand.nextLong());
+    dn.getFSDataset().createRbw(b);
+
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
+    sendBlockReports(dnR, poolId, reports);
+    printStats();
+
+    assertThat("Wrong number of corrupt blocks",
+               cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
+    assertThat("Wrong number of PendingDeletion blocks",
+               cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
+  }
+
+  /**
+   * Test creates a file and closes it.
+   * The second datanode is started in the cluster.
+   * As soon as the replication process is completed test runs
+   * Block report and checks that no underreplicated blocks are left
+   *
+   * @throws IOException in case of an error
+   */
+  @Test(timeout=300000)
+  public void blockReport_06() throws Exception {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    final int DN_N1 = DN_N0 + 1;
+
+    writeFile(METHOD_NAME, FILE_SIZE, filePath);
+    startDNandWait(filePath, true);
+
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
+    sendBlockReports(dnR, poolId, reports);
+    printStats();
+    assertEquals("Wrong number of PendingReplication Blocks",
+      0, cluster.getNamesystem().getUnderReplicatedBlocks());
+  }
+
+  /**
+   * Similar to BlockReport_03() but works with two DNs
+   * Test writes a file and closes it.
+   * The second datanode is started in the cluster.
+   * As soon as the replication process is completed test finds a block from
+   * the second DN and sets its GS to be < of original one.
+   * Block report is forced and the check for # of currupted blocks is performed.
+   * Another block is chosen and its length is set to a lesser than original.
+   * A check for another corrupted block is performed after yet another
+   * BlockReport
+   *
+   * @throws IOException in case of an error
+   */
+  @Test(timeout=300000)
+  public void blockReport_07() throws Exception {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    final int DN_N1 = DN_N0 + 1;
+
+    // write file and start second node to be "older" than the original
+    writeFile(METHOD_NAME, FILE_SIZE, filePath);
+    startDNandWait(filePath, true);
+
+    // all blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(DN_N1);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+    StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
+    sendBlockReports(dnR, poolId, reports);
+    printStats();
+
+    assertThat("Wrong number of corrupt blocks",
+               cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
+    assertThat("Wrong number of PendingDeletion blocks",
+               cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
+    assertThat("Wrong number of PendingReplication blocks",
+               cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
+
+    reports = getBlockReports(dn, poolId, true, true);
+    sendBlockReports(dnR, poolId, reports);
+    printStats();
+
+    assertThat("Wrong number of corrupt blocks",
+               cluster.getNamesystem().getCorruptReplicaBlocks(), is(2L));
+    assertThat("Wrong number of PendingDeletion blocks",
+               cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
+    assertThat("Wrong number of PendingReplication blocks",
+               cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
+
+    printStats();
+
+  }
+
+  /**
+   * The test set the configuration parameters for a large block size and
+   * restarts initiated single-node cluster.
+   * Then it writes a file > block_size and closes it.
+   * The second datanode is started in the cluster.
+   * As soon as the replication process is started and at least one TEMPORARY
+   * replica is found test forces BlockReport process and checks
+   * if the TEMPORARY replica isn't reported on it.
+   * Eventually, the configuration is being restored into the original state.
+   *
+   * @throws IOException in case of an error
+   */
+  @Test(timeout=300000)
+  public void blockReport_08() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    final int DN_N1 = DN_N0 + 1;
+    final int bytesChkSum = 1024 * 1000;
+
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum);
+    shutDownCluster();
+    startUpCluster();
+
+    try {
+      ArrayList<Block> blocks =
+        writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
+      Block bl = findBlock(filePath, 12 * bytesChkSum);
+      BlockChecker bc = new BlockChecker(filePath);
+      bc.start();
+
+      waitForTempReplica(bl, DN_N1);
+
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
+      sendBlockReports(dnR, poolId, reports);
+      printStats();
+      assertEquals("Wrong number of PendingReplication blocks",
+        blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
+
+      try {
+        bc.join();
+      } catch (InterruptedException e) { }
+    } finally {
+      resetConfiguration(); // return the initial state of the configuration
+    }
+  }
+
+  // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
+  // replica block. Expect the same behaviour: NN should simply ignore this
+  // block
+  @Test(timeout=300000)
+  public void blockReport_09() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    final int DN_N1 = DN_N0 + 1;
+    final int bytesChkSum = 1024 * 1000;
+
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum);
+    shutDownCluster();
+    startUpCluster();
+    // write file and start second node to be "older" than the original
+
+    try {
+      writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
+
+      Block bl = findBlock(filePath, 12 * bytesChkSum);
+      BlockChecker bc = new BlockChecker(filePath);
+      bc.start();
+
+      waitForTempReplica(bl, DN_N1);
+
+      // all blocks belong to the same file, hence same BP
+      DataNode dn = cluster.getDataNodes().get(DN_N1);
+      String poolId = cluster.getNamesystem().getBlockPoolId();
+      DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+      StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
+      sendBlockReports(dnR, poolId, reports);
+      printStats();
+      assertEquals("Wrong number of PendingReplication blocks",
+        2, cluster.getNamesystem().getPendingReplicationBlocks());
+
+      try {
+        bc.join();
+      } catch (InterruptedException e) {}
+    } finally {
+      resetConfiguration(); // return the initial state of the configuration
+    }
+  }
+
+  /**
+   * Test for the case where one of the DNs in the pipeline is in the
+   * process of doing a block report exactly when the block is closed.
+   * In this case, the block report becomes delayed until after the
+   * block is marked completed on the NN, and hence it reports an RBW
+   * replica for a COMPLETE block. Such a report should not be marked
+   * corrupt.
+   * This is a regression test for HDFS-2791.
+   */
+  @Test(timeout=300000)
+  public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
+    final CountDownLatch brFinished = new CountDownLatch(1);
+    DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
+      @Override
+      protected Object passThrough(InvocationOnMock invocation)
+          throws Throwable {
+        try {
+          return super.passThrough(invocation);
+        } finally {
+          // inform the test that our block report went through.
+          brFinished.countDown();
+        }
+      }
+    };
+
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+
+    // Start a second DN for this test -- we're checking
+    // what happens when one of the DNs is slowed for some reason.
+    REPL_FACTOR = 2;
+    startDNandWait(null, false);
+
+    NameNode nn = cluster.getNameNode();
+
+    FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
+    try {
+      AppendTestUtil.write(out, 0, 10);
+      out.hflush();
+
+      // Set up a spy so that we can delay the block report coming
+      // from this node.
+      DataNode dn = cluster.getDataNodes().get(0);
+      DatanodeProtocolClientSideTranslatorPB spy =
+        DataNodeTestUtils.spyOnBposToNN(dn, nn);
+
+      Mockito.doAnswer(delayer)
+        .when(spy).blockReport(
+          Mockito.<DatanodeRegistration>anyObject(),
+          Mockito.anyString(),
+          Mockito.<StorageBlockReport[]>anyObject());
+
+      // Force a block report to be generated. The block report will have
+      // an RBW replica in it. Wait for the RPC to be sent, but block
+      // it before it gets to the NN.
+      dn.scheduleAllBlockReport(0);
+      delayer.waitForCall();
+
+    } finally {
+      IOUtils.closeStream(out);
+    }
+
+    // Now that the stream is closed, the NN will have the block in COMPLETE
+    // state.
+    delayer.proceed();
+    brFinished.await();
+
+    // Verify that no replicas are marked corrupt, and that the
+    // file is still readable.
+    BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
+    assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
+    DFSTestUtil.readFile(fs, filePath);
+
+    // Ensure that the file is readable even from the DN that we futzed with.
+    cluster.stopDataNode(1);
+    DFSTestUtil.readFile(fs, filePath);
+  }
+
+  private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
+    final boolean tooLongWait = false;
+    final int TIMEOUT = 40000;
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Wait for datanode " + DN_N1 + " to appear");
+    }
+    while (cluster.getDataNodes().size() <= DN_N1) {
+      waitTil(20);
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
+    }
+    cluster.waitActive();
+
+    // Look about specified DN for the replica of the block from 1st DN
+    final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
+    long start = Time.now();
+    int count = 0;
+    while (r == null) {
+      waitTil(5);
+      r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
+      long waiting_period = Time.now() - start;
+      if (count++ % 100 == 0)
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Has been waiting for " + waiting_period + " ms.");
+        }
+      if (waiting_period > TIMEOUT)
+        assertTrue("Was waiting too long to get ReplicaInfo from a datanode",
+          tooLongWait);
+    }
+
+    HdfsServerConstants.ReplicaState state = r.getState();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Replica state before the loop " + state.getValue());
+    }
+    start = Time.now();
+    while (state != HdfsServerConstants.ReplicaState.TEMPORARY) {
+      waitTil(5);
+      state = r.getState();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Keep waiting for " + bl.getBlockName() +
+            " is in state " + state.getValue());
+      }
+      if (Time.now() - start > TIMEOUT)
+        assertTrue("Was waiting too long for a replica to become TEMPORARY",
+          tooLongWait);
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Replica state after the loop " + state.getValue());
+    }
+  }
+
+  // Helper methods from here below...
+  // Write file and start second data node.
+  private ArrayList<Block> writeFile(final String METHOD_NAME,
+                                               final long fileSize,
+                                               Path filePath) {
+    ArrayList<Block> blocks = null;
+    try {
+      REPL_FACTOR = 2;
+      blocks = prepareForRide(filePath, METHOD_NAME, fileSize);
+    } catch (IOException e) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Caught exception ", e);
+      }
+    }
+    return blocks;
+  }
+
+  private void startDNandWait(Path filePath, boolean waitReplicas)
+      throws IOException, InterruptedException, TimeoutException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Before next DN start: " + cluster.getDataNodes().size());
+    }
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.waitClusterUp();
+    ArrayList<DataNode> datanodes = cluster.getDataNodes();
+    assertEquals(datanodes.size(), 2);
+
+    if (LOG.isDebugEnabled()) {
+      int lastDn = datanodes.size() - 1;
+      LOG.debug("New datanode "
+          + cluster.getDataNodes().get(lastDn).getDisplayName()
+          + " has been started");
+    }
+    if (waitReplicas) {
+      DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
+    }
+  }
+
+  private ArrayList<Block> prepareForRide(final Path filePath,
+                                          final String METHOD_NAME,
+                                          long fileSize) throws IOException {
+    LOG.info("Running test " + METHOD_NAME);
+
+    DFSTestUtil.createFile(fs, filePath, fileSize,
+      REPL_FACTOR, rand.nextLong());
+
+    return locatedToBlocks(cluster.getNameNodeRpc()
+      .getBlockLocations(filePath.toString(), FILE_START,
+        fileSize).getLocatedBlocks(), null);
+  }
+
+  private void printStats() {
+    BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount());
+      LOG.debug("Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks());
+      LOG.debug("Under-replicated " + cluster.getNamesystem().
+          getUnderReplicatedBlocks());
+      LOG.debug("Pending delete " + cluster.getNamesystem().
+          getPendingDeletionBlocks());
+      LOG.debug("Pending replications " + cluster.getNamesystem().
+          getPendingReplicationBlocks());
+      LOG.debug("Excess " + cluster.getNamesystem().getExcessBlocks());
+      LOG.debug("Total " + cluster.getNamesystem().getBlocksTotal());
+    }
+  }
+
+  private ArrayList<Block> locatedToBlocks(final List<LocatedBlock> locatedBlks,
+                                           List<Integer> positionsToRemove) {
+    ArrayList<Block> newList = new ArrayList<Block>();
+    for (int i = 0; i < locatedBlks.size(); i++) {
+      if (positionsToRemove != null && positionsToRemove.contains(i)) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(i + " block to be omitted");
+        }
+        continue;
+      }
+      newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock()));
+    }
+    return newList;
+  }
+
+  private void waitTil(long waitPeriod) {
+    try { //Wait til next re-scan
+      Thread.sleep(waitPeriod);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private List<File> findAllFiles(File top, FilenameFilter mask) {
+    if (top == null) return null;
+    ArrayList<File> ret = new ArrayList<File>();
+    for (File f : top.listFiles()) {
+      if (f.isDirectory())
+        ret.addAll(findAllFiles(f, mask));
+      else if (mask.accept(f, f.getName()))
+        ret.add(f);
+    }
+    return ret;
+  }
+
+  private class MyFileFilter implements FilenameFilter {
+    private String nameToAccept = "";
+    private boolean all = false;
+
+    public MyFileFilter(String nameToAccept, boolean all) {
+      if (nameToAccept == null)
+        throw new IllegalArgumentException("Argument isn't suppose to be null");
+      this.nameToAccept = nameToAccept;
+      this.all = all;
+    }
+
+    @Override
+    public boolean accept(File file, String s) {
+      if (all)
+        return s != null && s.startsWith(nameToAccept);
+      else
+        return s != null && s.equals(nameToAccept);
+    }
+  }
+
+  private static void initLoggers() {
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) BlockReportTestBase.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private Block findBlock(Path path, long size) throws IOException {
+    Block ret;
+      List<LocatedBlock> lbs =
+        cluster.getNameNodeRpc()
+        .getBlockLocations(path.toString(),
+          FILE_START, size).getLocatedBlocks();
+      LocatedBlock lb = lbs.get(lbs.size() - 1);
+
+      // Get block from the first DN
+      ret = cluster.getDataNodes().get(DN_N0).
+        data.getStoredBlock(lb.getBlock()
+        .getBlockPoolId(), lb.getBlock().getBlockId());
+    return ret;
+  }
+
+  private class BlockChecker extends Thread {
+    Path filePath;
+
+    public BlockChecker(final Path filePath) {
+      this.filePath = filePath;
+    }
+
+    @Override
+    public void run() {
+      try {
+        startDNandWait(filePath, true);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail("Failed to start BlockChecker: " + e);
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java?rev=1563254&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java Fri Jan 31 21:00:33 2014
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests that the DataNode respects
+ * {@link DFSConfigKeys#DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY}
+ */
+public class TestDnRespectsBlockReportSplitThreshold {
+  public static final Log LOG = LogFactory.getLog(TestStorageReport.class);
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final short REPL_FACTOR = 1;
+  private static final long seed = 0xFEEDFACE;
+  private static final int BLOCKS_IN_FILE = 5;
+
+  private static Configuration conf;
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  static String bpid;
+
+  public void startUpCluster(long splitThreshold) throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, splitThreshold);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(REPL_FACTOR)
+        .build();
+    fs = cluster.getFileSystem();
+    bpid = cluster.getNamesystem().getBlockPoolId();
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if (cluster != null) {
+      fs.close();
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFile(String filenamePrefix, int blockCount)
+      throws IOException {
+    Path path = new Path("/" + filenamePrefix + ".dat");
+    DFSTestUtil.createFile(fs, path, BLOCK_SIZE,
+        blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
+  }
+
+  private void verifyCapturedArguments(
+      ArgumentCaptor<StorageBlockReport[]> captor,
+      int expectedReportsPerCall,
+      int expectedTotalBlockCount) {
+
+    List<StorageBlockReport[]> listOfReports = captor.getAllValues();
+    int numBlocksReported = 0;
+    for (StorageBlockReport[] reports : listOfReports) {
+      assertThat(reports.length, is(expectedReportsPerCall));
+
+      for (StorageBlockReport report : reports) {
+        BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
+        numBlocksReported += blockList.getNumberOfBlocks();
+      }
+    }
+
+    assert(numBlocksReported >= expectedTotalBlockCount);
+  }
+
+  /**
+   * Test that if splitThreshold is zero, then we always get a separate
+   * call per storage.
+   */
+  @Test(timeout=300000)
+  public void testAlwaysSplit() throws IOException, InterruptedException {
+    startUpCluster(0);
+    NameNode nn = cluster.getNameNode();
+    DataNode dn = cluster.getDataNodes().get(0);
+
+    // Create a file with a few blocks.
+    createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
+
+    // Insert a spy object for the NN RPC.
+    DatanodeProtocolClientSideTranslatorPB nnSpy =
+        DataNodeTestUtils.spyOnBposToNN(dn, nn);
+
+    // Trigger a block report so there is an interaction with the spy
+    // object.
+    DataNodeTestUtils.triggerBlockReport(dn);
+
+    ArgumentCaptor<StorageBlockReport[]> captor =
+        ArgumentCaptor.forClass(StorageBlockReport[].class);
+
+    Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
+        any(DatanodeRegistration.class),
+        anyString(),
+        captor.capture());
+
+    verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
+  }
+
+  /**
+   * Tests the behavior when the count of blocks is exactly one less than
+   * the threshold.
+   */
+  @Test(timeout=300000)
+  public void testCornerCaseUnderThreshold() throws IOException, InterruptedException {
+    startUpCluster(BLOCKS_IN_FILE + 1);
+    NameNode nn = cluster.getNameNode();
+    DataNode dn = cluster.getDataNodes().get(0);
+
+    // Create a file with a few blocks.
+    createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
+
+    // Insert a spy object for the NN RPC.
+    DatanodeProtocolClientSideTranslatorPB nnSpy =
+        DataNodeTestUtils.spyOnBposToNN(dn, nn);
+
+    // Trigger a block report so there is an interaction with the spy
+    // object.
+    DataNodeTestUtils.triggerBlockReport(dn);
+
+    ArgumentCaptor<StorageBlockReport[]> captor =
+        ArgumentCaptor.forClass(StorageBlockReport[].class);
+
+    Mockito.verify(nnSpy, times(1)).blockReport(
+        any(DatanodeRegistration.class),
+        anyString(),
+        captor.capture());
+
+    verifyCapturedArguments(captor, MiniDFSCluster.DIRS_PER_DATANODE, BLOCKS_IN_FILE);
+  }
+
+  /**
+   * Tests the behavior when the count of blocks is exactly equal to the
+   * threshold.
+   */
+  @Test(timeout=300000)
+  public void testCornerCaseAtThreshold() throws IOException, InterruptedException {
+    startUpCluster(BLOCKS_IN_FILE);
+    NameNode nn = cluster.getNameNode();
+    DataNode dn = cluster.getDataNodes().get(0);
+
+    // Create a file with a few blocks.
+    createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
+
+    // Insert a spy object for the NN RPC.
+    DatanodeProtocolClientSideTranslatorPB nnSpy =
+        DataNodeTestUtils.spyOnBposToNN(dn, nn);
+
+    // Trigger a block report so there is an interaction with the spy
+    // object.
+    DataNodeTestUtils.triggerBlockReport(dn);
+
+    ArgumentCaptor<StorageBlockReport[]> captor =
+        ArgumentCaptor.forClass(StorageBlockReport[].class);
+
+    Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
+        any(DatanodeRegistration.class),
+        anyString(),
+        captor.capture());
+
+    verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
+  }
+
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java?rev=1563254&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java Fri Jan 31 21:00:33 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+
+
+/**
+ * Runs all tests in BlockReportTestBase, sending one block per storage.
+ * This is the default DataNode behavior post HDFS-2832.
+ */
+public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
+
+  @Override
+  protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
+      StorageBlockReport[] reports) throws IOException {
+    for (StorageBlockReport report : reports) {
+      LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
+      StorageBlockReport[] singletonReport = { report };
+      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java?rev=1563254&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java Fri Jan 31 21:00:33 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+
+/**
+ * Runs all tests in BlockReportTestBase, sending one block report
+ * per DataNode. This tests that the NN can handle the legacy DN
+ * behavior where it presents itself as a single logical storage.
+ */
+public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
+
+  @Override
+  protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
+                                  StorageBlockReport[] reports) throws IOException {
+    LOG.info("Sending combined block reports for " + dnR);
+    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+  }
+}



Mime
View raw message