hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [21/50] [abbrv] hadoop git commit: HDFS-13947. Review of DirectoryScanner Class. Contributed by BELUGA BEHR.
Date Fri, 05 Oct 2018 22:06:50 GMT
HDFS-13947. Review of DirectoryScanner Class. Contributed by BELUGA BEHR.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1dc0adfa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1dc0adfa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1dc0adfa

Branch: refs/heads/HDFS-12943
Commit: 1dc0adfac0ee4821c67366728c70be9b59477b0f
Parents: 7051bd7
Author: Inigo Goiri <inigoiri@apache.org>
Authored: Wed Oct 3 11:19:57 2018 -0700
Committer: Inigo Goiri <inigoiri@apache.org>
Committed: Wed Oct 3 11:19:57 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   2 +-
 .../hdfs/server/datanode/DirectoryScanner.java  | 601 ++++++++++---------
 .../server/datanode/TestDirectoryScanner.java   | 243 ++++----
 .../fsdataset/impl/TestProvidedImpl.java        |  13 +-
 .../namenode/TestListCorruptFileBlocks.java     |  50 +-
 6 files changed, 481 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d8024dc..42709de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -709,7 +709,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
       "dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
   public static final int
-      DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000;
+      DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1;
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
   public static final String  DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
   public static final String  DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 270e30b..40f80a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1086,7 +1086,7 @@ public class DataNode extends ReconfigurableBase
       reason = "verifcation is not supported by SimulatedFSDataset";
     } 
     if (reason == null) {
-      directoryScanner = new DirectoryScanner(this, data, conf);
+      directoryScanner = new DirectoryScanner(data, conf);
       directoryScanner.start();
     } else {
       LOG.info("Periodic Directory Tree Verification scan " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 99584d9..484899d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -17,17 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,23 +38,27 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
-import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 
 /**
  * Periodically scans the data directories for block and block metadata files.
@@ -62,48 +68,48 @@ import org.apache.hadoop.util.Time;
 public class DirectoryScanner implements Runnable {
   private static final Logger LOG =
       LoggerFactory.getLogger(DirectoryScanner.class);
-  private static final int MILLIS_PER_SECOND = 1000;
-  private static final String START_MESSAGE =
-      "Periodic Directory Tree Verification scan"
-      + " starting at %s with interval of %dms";
-  private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
-      + " and throttle limit of %dms/s";
+
+  private static final int DEFAULT_MAP_SIZE = 32768;
 
   private final FsDatasetSpi<?> dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
   private final long scanPeriodMsecs;
-  private final int throttleLimitMsPerSec;
-  private volatile boolean shouldRun = false;
+  private final long throttleLimitMsPerSec;
+  private final AtomicBoolean shouldRun = new AtomicBoolean();
+
   private boolean retainDiffs = false;
-  private final DataNode datanode;
 
   /**
    * Total combined wall clock time (in milliseconds) spent by the report
-   * compiler threads executing.  Used for testing purposes.
+   * compiler threads executing. Used for testing purposes.
    */
   @VisibleForTesting
   final AtomicLong timeRunningMs = new AtomicLong(0L);
+
   /**
    * Total combined wall clock time (in milliseconds) spent by the report
-   * compiler threads blocked by the throttle.  Used for testing purposes.
+   * compiler threads blocked by the throttle. Used for testing purposes.
    */
   @VisibleForTesting
   final AtomicLong timeWaitingMs = new AtomicLong(0L);
+
   /**
    * The complete list of block differences indexed by block pool ID.
    */
   @VisibleForTesting
-  final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
+  final BlockPoolReport diffs = new BlockPoolReport();
+
   /**
-   * Statistics about the block differences in each blockpool, indexed by
-   * block pool ID.
+   * Statistics about the block differences in each blockpool, indexed by block
+   * pool ID.
    */
   @VisibleForTesting
-  final Map<String, Stats> stats = new HashMap<String, Stats>();
-  
+  final Map<String, Stats> stats;
+
   /**
-   * Allow retaining diffs for unit test and analysis. Defaults to false (off)
+   * Allow retaining diffs for unit test and analysis. Defaults to false (off).
+   *
    * @param b whether to retain diffs
    */
   @VisibleForTesting
@@ -123,92 +129,157 @@ public class DirectoryScanner implements Runnable {
     long missingMemoryBlocks = 0;
     long mismatchBlocks = 0;
     long duplicateBlocks = 0;
-    
+
     /**
      * Create a new Stats object for the given blockpool ID.
+     *
      * @param bpid blockpool ID
      */
     public Stats(String bpid) {
       this.bpid = bpid;
     }
-    
+
     @Override
     public String toString() {
-      return "BlockPool " + bpid
-      + " Total blocks: " + totalBlocks + ", missing metadata files:"
-      + missingMetaFile + ", missing block files:" + missingBlockFile
-      + ", missing blocks in memory:" + missingMemoryBlocks
-      + ", mismatched blocks:" + mismatchBlocks;
+      return "BlockPool " + bpid + " Total blocks: " + totalBlocks
+          + ", missing metadata files: " + missingMetaFile
+          + ", missing block files: " + missingBlockFile
+          + ", missing blocks in memory: " + missingMemoryBlocks
+          + ", mismatched blocks: " + mismatchBlocks;
     }
   }
 
   /**
    * Helper class for compiling block info reports from report compiler threads.
+   * Contains a volume, a set of block pool IDs, and a collection of ScanInfo
+   * objects. If a block pool exists but has no ScanInfo objects associated with
+   * it, there will be no mapping for that particular block pool.
    */
-  static class ScanInfoPerBlockPool extends 
-                     HashMap<String, LinkedList<ScanInfo>> {
-    
+  @VisibleForTesting
+  public static class ScanInfoVolumeReport {
+
+    @SuppressWarnings("unused")
     private static final long serialVersionUID = 1L;
 
+    private final FsVolumeSpi volume;
+
+    private final BlockPoolReport blockPoolReport;
+
     /**
      * Create a new info list.
+     *
+     * @param volume
      */
-    ScanInfoPerBlockPool() {super();}
+    ScanInfoVolumeReport(final FsVolumeSpi volume) {
+      this.volume = volume;
+      this.blockPoolReport = new BlockPoolReport();
+    }
 
     /**
      * Create a new info list initialized to the given expected size.
-     * See {@link java.util.HashMap#HashMap(int)}.
      *
      * @param sz initial expected size
      */
-    ScanInfoPerBlockPool(int sz) {super(sz);}
-    
+    ScanInfoVolumeReport(final FsVolumeSpi volume,
+        final Collection<String> blockPools) {
+      this.volume = volume;
+      this.blockPoolReport = new BlockPoolReport(blockPools);
+    }
+
+    public void addAll(final String bpid,
+        final Collection<ScanInfo> scanInfos) {
+      this.blockPoolReport.addAll(bpid, scanInfos);
+    }
+
+    public Set<String> getBlockPoolIds() {
+      return this.blockPoolReport.getBlockPoolIds();
+    }
+
+    public List<ScanInfo> getScanInfo(final String bpid) {
+      return this.blockPoolReport.getScanInfo(bpid);
+    }
+
+    public FsVolumeSpi getVolume() {
+      return volume;
+    }
+
+    @Override
+    public String toString() {
+      return "ScanInfoVolumeReport [volume=" + volume + ", blockPoolReport="
+          + blockPoolReport + "]";
+    }
+  }
+
+  /**
+   * Helper class for compiling block info reports per block pool.
+   */
+  @VisibleForTesting
+  public static class BlockPoolReport {
+
+    @SuppressWarnings("unused")
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> blockPools;
+
+    private final ListMultimap<String, ScanInfo> map;
+
     /**
-     * Merges {@code that} ScanInfoPerBlockPool into this one
+     * Create a block pool report.
      *
-     * @param that ScanInfoPerBlockPool to merge
+     * @param volume
      */
-    public void addAll(ScanInfoPerBlockPool that) {
-      if (that == null) return;
-      
-      for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
-        String bpid = entry.getKey();
-        LinkedList<ScanInfo> list = entry.getValue();
-        
-        if (this.containsKey(bpid)) {
-          //merge that per-bpid linked list with this one
-          this.get(bpid).addAll(list);
-        } else {
-          //add that new bpid and its linked list to this
-          this.put(bpid, list);
-        }
-      }
+    BlockPoolReport() {
+      this.blockPools = new HashSet<>(2);
+      this.map = ArrayListMultimap.create(2, DEFAULT_MAP_SIZE);
     }
-    
+
     /**
-     * Convert all the LinkedList values in this ScanInfoPerBlockPool map
-     * into sorted arrays, and return a new map of these arrays per blockpool
+     * Create a new block pool report initialized to the given expected size.
      *
-     * @return a map of ScanInfo arrays per blockpool
+     * @param blockPools initial list of known block pools
      */
-    public Map<String, ScanInfo[]> toSortedArrays() {
-      Map<String, ScanInfo[]> result = 
-        new HashMap<String, ScanInfo[]>(this.size());
-      
-      for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
-        String bpid = entry.getKey();
-        LinkedList<ScanInfo> list = entry.getValue();
-        
-        // convert list to array
-        ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
+    BlockPoolReport(final Collection<String> blockPools) {
+      this.blockPools = new HashSet<>(blockPools);
+      this.map = ArrayListMultimap.create(blockPools.size(), DEFAULT_MAP_SIZE);
+
+    }
+
+    public void addAll(final String bpid,
+        final Collection<ScanInfo> scanInfos) {
+      this.blockPools.add(bpid);
+      this.map.putAll(bpid, scanInfos);
+    }
+
+    public void sortBlocks() {
+      for (final String bpid : this.map.keySet()) {
+        final List<ScanInfo> list = this.map.get(bpid);
         // Sort array based on blockId
-        Arrays.sort(record);
-        result.put(bpid, record);            
+        Collections.sort(list);
       }
-      return result;
     }
-  }
 
+    public Set<String> getBlockPoolIds() {
+      return Collections.unmodifiableSet(this.blockPools);
+    }
+
+    public List<ScanInfo> getScanInfo(final String bpid) {
+      return this.map.get(bpid);
+    }
+
+    public Collection<Map.Entry<String, ScanInfo>> getEntries() {
+      return Collections.unmodifiableCollection(this.map.entries());
+    }
+
+    public void clear() {
+      this.map.clear();
+      this.blockPools.clear();
+    }
+
+    @Override
+    public String toString() {
+      return "BlockPoolReport [blockPools=" + blockPools + ", map=" + map + "]";
+    }
+  }
 
   /**
    * Create a new directory scanner, but don't cycle it running yet.
@@ -217,75 +288,58 @@ public class DirectoryScanner implements Runnable {
    * @param dataset the dataset to scan
    * @param conf the Configuration object
    */
-  public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
-      Configuration conf) {
-    this.datanode = datanode;
+  public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
     this.dataset = dataset;
+    this.stats = new HashMap<>(DEFAULT_MAP_SIZE);
     int interval = (int) conf.getTimeDuration(
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT,
         TimeUnit.SECONDS);
-    scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
 
-    int throttle =
-        conf.getInt(
-          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
-          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+    scanPeriodMsecs = TimeUnit.SECONDS.toMillis(interval);
 
-    if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
-      if (throttle > MILLIS_PER_SECOND) {
-        LOG.error(
-            DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
-            + " set to value above 1000 ms/sec. Assuming default value of " +
-            DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
-      } else {
-        LOG.error(
-            DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
-            + " set to value below 1 ms/sec. Assuming default value of " +
-            DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
-      }
+    int throttle = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
 
-      throttleLimitMsPerSec =
+    if (throttle >= TimeUnit.SECONDS.toMillis(1)) {
+      LOG.warn(
+          "{} set to value above 1000 ms/sec. Assuming default value of {}",
+          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+      throttle =
           DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
-    } else {
-      throttleLimitMsPerSec = throttle;
     }
 
-    int threads = 
+    throttleLimitMsPerSec = throttle;
+
+    int threads =
         conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
-                    DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
+            DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
 
-    reportCompileThreadPool = Executors.newFixedThreadPool(threads, 
-        new Daemon.DaemonFactory());
-    masterThread = new ScheduledThreadPoolExecutor(1,
-        new Daemon.DaemonFactory());
+    reportCompileThreadPool =
+        Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory());
+
+    masterThread =
+        new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory());
   }
 
   /**
-   * Start the scanner.  The scanner will run every
+   * Start the scanner. The scanner will run every
    * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
    */
   void start() {
-    shouldRun = true;
-    long offset = ThreadLocalRandom.current().nextInt(
-        (int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
-    long firstScanTime = Time.now() + offset;
-    String logMsg;
-
-    if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
-      logMsg = String.format(START_MESSAGE_WITH_THROTTLE,
-          FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs,
-          throttleLimitMsPerSec);
-    } else {
-      logMsg = String.format(START_MESSAGE,
-          FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs);
-    }
-
-    LOG.info(logMsg);
-    masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, 
-                                     TimeUnit.MILLISECONDS);
+    shouldRun.set(true);
+    long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
+
+    LOG.info(
+        "Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s",
+        firstScanTime, scanPeriodMsecs, throttleLimitMsPerSec);
+
+    masterThread.scheduleAtFixedRate(this, firstScanTime, scanPeriodMsecs,
+        TimeUnit.MILLISECONDS);
   }
-  
+
   /**
    * Return whether the scanner has been started.
    *
@@ -293,7 +347,7 @@ public class DirectoryScanner implements Runnable {
    */
   @VisibleForTesting
   boolean getRunStatus() {
-    return shouldRun;
+    return shouldRun.get();
   }
 
   /**
@@ -305,67 +359,69 @@ public class DirectoryScanner implements Runnable {
   }
 
   /**
-   * Main program loop for DirectoryScanner.  Runs {@link reconcile()}
-   * and handles any exceptions.
+   * Main program loop for DirectoryScanner. Runs {@link reconcile()} and
+   * handles any exceptions.
    */
   @Override
   public void run() {
+    if (!shouldRun.get()) {
+      // shutdown has been activated
+      LOG.warn(
+          "This cycle terminating immediately because 'shouldRun' has been deactivated");
+      return;
+    }
     try {
-      if (!shouldRun) {
-        //shutdown has been activated
-        LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
-        return;
-      }
-
-      //We're are okay to run - do it
-      reconcile();      
-      
+      reconcile();
     } catch (Exception e) {
-      //Log and continue - allows Executor to run again next cycle
-      LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e);
+      // Log and continue - allows Executor to run again next cycle
+      LOG.error(
+          "Exception during DirectoryScanner execution - will continue next cycle",
+          e);
     } catch (Error er) {
-      //Non-recoverable error - re-throw after logging the problem
-      LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er);
+      // Non-recoverable error - re-throw after logging the problem
+      LOG.error(
+          "System Error during DirectoryScanner execution - permanently terminating periodic scanner",
+          er);
       throw er;
     }
   }
-  
+
   /**
-   * Stops the directory scanner.  This method will wait for 1 minute for the
+   * Stops the directory scanner. This method will wait for 1 minute for the
    * main thread to exit and an additional 1 minute for the report compilation
-   * threads to exit.  If a thread does not exit in that time period, it is
-   * left running, and an error is logged.
+   * threads to exit. If a thread does not exit in that time period, it is left
+   * running, and an error is logged.
    */
   void shutdown() {
-    if (!shouldRun) {
-      LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
-    } else {
-      LOG.warn("DirectoryScanner: shutdown has been called");      
+    LOG.info("Shutdown has been called");
+    if (!shouldRun.getAndSet(false)) {
+      LOG.warn("Shutdown has been called, but periodic scanner not started");
+    }
+    if (masterThread != null) {
+      masterThread.shutdown();
     }
-    shouldRun = false;
-    if (masterThread != null) masterThread.shutdown();
-
     if (reportCompileThreadPool != null) {
       reportCompileThreadPool.shutdownNow();
     }
-
     if (masterThread != null) {
       try {
         masterThread.awaitTermination(1, TimeUnit.MINUTES);
       } catch (InterruptedException e) {
-        LOG.error("interrupted while waiting for masterThread to " +
-          "terminate", e);
+        LOG.error(
+            "interrupted while waiting for masterThread to " + "terminate", e);
       }
     }
     if (reportCompileThreadPool != null) {
       try {
         reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
       } catch (InterruptedException e) {
-        LOG.error("interrupted while waiting for reportCompileThreadPool to " +
-          "terminate", e);
+        LOG.error("interrupted while waiting for reportCompileThreadPool to "
+            + "terminate", e);
       }
     }
-    if (!retainDiffs) clear();
+    if (!retainDiffs) {
+      clear();
+    }
   }
 
   /**
@@ -374,45 +430,54 @@ public class DirectoryScanner implements Runnable {
   @VisibleForTesting
   public void reconcile() throws IOException {
     scan();
-    for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
-      String bpid = entry.getKey();
-      LinkedList<ScanInfo> diff = entry.getValue();
-      
-      for (ScanInfo info : diff) {
-        dataset.checkAndUpdate(bpid, info);
-      }
+
+    for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) {
+      dataset.checkAndUpdate(entry.getKey(), entry.getValue());
+    }
+
+    if (!retainDiffs) {
+      clear();
     }
-    if (!retainDiffs) clear();
   }
 
   /**
-   * Scan for the differences between disk and in-memory blocks
-   * Scan only the "finalized blocks" lists of both disk and memory.
+   * Scan for the differences between disk and in-memory blocks Scan only the
+   * "finalized blocks" lists of both disk and memory.
    */
   private void scan() {
+    BlockPoolReport blockPoolReport = new BlockPoolReport();
+
     clear();
-    Map<String, ScanInfo[]> diskReport = getDiskReport();
+
+    Collection<ScanInfoVolumeReport> volumeReports = getVolumeReports();
+    for (ScanInfoVolumeReport volumeReport : volumeReports) {
+      for (String blockPoolId : volumeReport.getBlockPoolIds()) {
+        List<ScanInfo> scanInfos = volumeReport.getScanInfo(blockPoolId);
+        blockPoolReport.addAll(blockPoolId, scanInfos);
+      }
+    }
+
+    // Pre-sort the reports outside of the lock
+    blockPoolReport.sortBlocks();
 
     // Hold FSDataset lock to prevent further changes to the block map
-    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
-      for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
-        String bpid = entry.getKey();
-        ScanInfo[] blockpoolReport = entry.getValue();
-        
+    try (AutoCloseableLock lock = dataset.acquireDatasetLock()) {
+      for (final String bpid : blockPoolReport.getBlockPoolIds()) {
+        List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
+
         Stats statsRecord = new Stats(bpid);
         stats.put(bpid, statsRecord);
-        LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
-        diffs.put(bpid, diffRecord);
-        
-        statsRecord.totalBlocks = blockpoolReport.length;
+        Collection<ScanInfo> diffRecord = new ArrayList<>();
+
+        statsRecord.totalBlocks = blockpoolReport.size();
         final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
         Collections.sort(bl); // Sort based on blockId
-  
+
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
-        while (m < bl.size() && d < blockpoolReport.length) {
+        while (m < bl.size() && d < blockpoolReport.size()) {
           ReplicaInfo memBlock = bl.get(m);
-          ScanInfo info = blockpoolReport[d];
+          ScanInfo info = blockpoolReport.get(d);
           if (info.getBlockId() < memBlock.getBlockId()) {
             if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
               // Block is missing in memory
@@ -424,15 +489,15 @@ public class DirectoryScanner implements Runnable {
           }
           if (info.getBlockId() > memBlock.getBlockId()) {
             // Block is missing on the disk
-            addDifference(diffRecord, statsRecord,
-                          memBlock.getBlockId(), info.getVolume());
+            addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
+                info.getVolume());
             m++;
             continue;
           }
           // Block file and/or metadata file exists on the disk
           // Block exists in memory
-          if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
-              info.getBlockFile() == null) {
+          if (info.getVolume().getStorageType() != StorageType.PROVIDED
+              && info.getBlockFile() == null) {
             // Block metadata file exits and block file is missing
             addDifference(diffRecord, statsRecord, info);
           } else if (info.getGenStamp() != memBlock.getGenerationStamp()
@@ -442,16 +507,16 @@ public class DirectoryScanner implements Runnable {
             statsRecord.mismatchBlocks++;
             addDifference(diffRecord, statsRecord, info);
           } else if (memBlock.compareWith(info) != 0) {
-            // volumeMap record and on-disk files don't match.
+            // volumeMap record and on-disk files do not match.
             statsRecord.duplicateBlocks++;
             addDifference(diffRecord, statsRecord, info);
           }
           d++;
 
-          if (d < blockpoolReport.length) {
-            // There may be multiple on-disk records for the same block, don't increment
-            // the memory record pointer if so.
-            ScanInfo nextInfo = blockpoolReport[d];
+          if (d < blockpoolReport.size()) {
+            // There may be multiple on-disk records for the same block, do not
+            // increment the memory record pointer if so.
+            ScanInfo nextInfo = blockpoolReport.get(d);
             if (nextInfo.getBlockId() != info.getBlockId()) {
               ++m;
             }
@@ -461,132 +526,108 @@ public class DirectoryScanner implements Runnable {
         }
         while (m < bl.size()) {
           ReplicaInfo current = bl.get(m++);
-          addDifference(diffRecord, statsRecord,
-                        current.getBlockId(), current.getVolume());
+          addDifference(diffRecord, statsRecord, current.getBlockId(),
+              current.getVolume());
         }
-        while (d < blockpoolReport.length) {
-          if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
+        while (d < blockpoolReport.size()) {
+          if (!dataset.isDeletingBlock(bpid,
+              blockpoolReport.get(d).getBlockId())) {
             statsRecord.missingMemoryBlocks++;
-            addDifference(diffRecord, statsRecord, blockpoolReport[d]);
+            addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
           }
           d++;
         }
-        LOG.info(statsRecord.toString());
-      } //end for
-    } //end synchronized
+        diffs.addAll(bpid, diffRecord);
+        LOG.info("Scan Results: {}", statsRecord);
+      }
+    }
   }
 
   /**
    * Add the ScanInfo object to the list of differences and adjust the stats
-   * accordingly.  This method is called when a block is found on the disk,
-   * but the in-memory block is missing or does not match the block on the disk.
+   * accordingly. This method is called when a block is found on the disk, but
+   * the in-memory block is missing or does not match the block on the disk.
    *
-   * @param diffRecord the list to which to add the info
+   * @param diffRecord the collection to which to add the info
    * @param statsRecord the stats to update
    * @param info the differing info
    */
-  private void addDifference(LinkedList<ScanInfo> diffRecord, 
-                             Stats statsRecord, ScanInfo info) {
+  private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
+      ScanInfo info) {
     statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
     statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
     diffRecord.add(info);
   }
 
   /**
-   * Add a new ScanInfo object to the list of differences and adjust the stats
-   * accordingly.  This method is called when a block is not found on the disk.
+   * Add a new ScanInfo object to the collection of differences and adjust the
+   * stats accordingly. This method is called when a block is not found on the
+   * disk.
    *
-   * @param diffRecord the list to which to add the info
+   * @param diffRecord the collection to which to add the info
    * @param statsRecord the stats to update
    * @param blockId the id of the missing block
    * @param vol the volume that contains the missing block
    */
-  private void addDifference(LinkedList<ScanInfo> diffRecord,
-                             Stats statsRecord, long blockId,
-                             FsVolumeSpi vol) {
+  private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
+      long blockId, FsVolumeSpi vol) {
     statsRecord.missingBlockFile++;
     statsRecord.missingMetaFile++;
     diffRecord.add(new ScanInfo(blockId, null, null, vol));
   }
 
   /**
-   * Get the lists of blocks on the disks in the dataset, sorted by blockId.
-   * The returned map contains one entry per blockpool, keyed by the blockpool
-   * ID.
-   *
-   * @return a map of sorted arrays of block information
+   * Get the lists of blocks on the disks in the data set.
    */
   @VisibleForTesting
-  public Map<String, ScanInfo[]> getDiskReport() {
-    ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
-    ScanInfoPerBlockPool[] dirReports = null;
+  public Collection<ScanInfoVolumeReport> getVolumeReports() {
+    List<ScanInfoVolumeReport> volReports = new ArrayList<>();
+    List<Future<ScanInfoVolumeReport>> compilersInProgress = new ArrayList<>();
+
     // First get list of data directories
     try (FsDatasetSpi.FsVolumeReferences volumes =
         dataset.getFsVolumeReferences()) {
 
-      // Use an array since the threads may return out of order and
-      // compilersInProgress#keySet may return out of order as well.
-      dirReports = new ScanInfoPerBlockPool[volumes.size()];
-
-      Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
-          new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
-
-      for (int i = 0; i < volumes.size(); i++) {
-        if (volumes.get(i).getStorageType() == StorageType.PROVIDED) {
-          // Disable scanning PROVIDED volumes to keep overhead low
-          continue;
+      for (final FsVolumeSpi volume : volumes) {
+        // Disable scanning PROVIDED volumes to keep overhead low
+        if (volume.getStorageType() != StorageType.PROVIDED) {
+          ReportCompiler reportCompiler = new ReportCompiler(volume);
+          Future<ScanInfoVolumeReport> result =
+              reportCompileThreadPool.submit(reportCompiler);
+          compilersInProgress.add(result);
         }
-        ReportCompiler reportCompiler =
-            new ReportCompiler(datanode, volumes.get(i));
-        Future<ScanInfoPerBlockPool> result =
-            reportCompileThreadPool.submit(reportCompiler);
-        compilersInProgress.put(i, result);
       }
 
-      for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
-          compilersInProgress.entrySet()) {
-        Integer index = report.getKey();
+      for (Future<ScanInfoVolumeReport> future : compilersInProgress) {
         try {
-          dirReports[index] = report.getValue().get();
-
-          // If our compiler threads were interrupted, give up on this run
-          if (dirReports[index] == null) {
-            dirReports = null;
+          final ScanInfoVolumeReport result = future.get();
+          if (!CollectionUtils.addIgnoreNull(volReports, result)) {
+            // This compiler thread were interrupted, give up on this run
+            volReports.clear();
             break;
           }
         } catch (Exception ex) {
-          FsVolumeSpi fsVolumeSpi = volumes.get(index);
-          LOG.error("Error compiling report for the volume, StorageId: "
-              + fsVolumeSpi.getStorageID(), ex);
-          // Continue scanning the other volumes
+          LOG.warn("Error compiling report. Continuing.", ex);
         }
       }
     } catch (IOException e) {
       LOG.error("Unexpected IOException by closing FsVolumeReference", e);
     }
-    if (dirReports != null) {
-      // Compile consolidated report for all the volumes
-      for (ScanInfoPerBlockPool report : dirReports) {
-        if(report != null){
-          list.addAll(report);
-        }
-      }
-    }
-    return list.toSortedArrays();
+
+    return volReports;
   }
 
   /**
    * The ReportCompiler class encapsulates the process of searching a datanode's
-   * disks for block information.  It operates by performing a DFS of the
-   * volume to discover block information.
+   * disks for block information. It operates by performing a DFS of the volume
+   * to discover block information.
    *
    * When the ReportCompiler discovers block information, it create a new
-   * ScanInfo object for it and adds that object to its report list.  The report
+   * ScanInfo object for it and adds that object to its report list. The report
    * list is returned by the {@link #call()} method.
    */
-  public class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
+  public class ReportCompiler implements Callable<ScanInfoVolumeReport> {
     private final FsVolumeSpi volume;
-    private final DataNode datanode;
     // Variable for tracking time spent running for throttling purposes
     private final StopWatch throttleTimer = new StopWatch();
     // Variable for tracking time spent running and waiting for testing
@@ -594,13 +635,11 @@ public class DirectoryScanner implements Runnable {
     private final StopWatch perfTimer = new StopWatch();
 
     /**
-     * Create a report compiler for the given volume on the given datanode.
+     * Create a report compiler for the given volume.
      *
-     * @param datanode the target datanode
      * @param volume the target volume
      */
-    public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
-      this.datanode = datanode;
+    public ReportCompiler(FsVolumeSpi volume) {
       this.volume = volume;
     }
 
@@ -608,12 +647,13 @@ public class DirectoryScanner implements Runnable {
      * Run this report compiler thread.
      *
      * @return the block info report list
-     * @throws IOException if the block pool isn't found
+     * @throws IOException if the block pool is not found
      */
     @Override
-    public ScanInfoPerBlockPool call() throws IOException {
+    public ScanInfoVolumeReport call() throws IOException {
       String[] bpList = volume.getBlockPoolList();
-      ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
+      ScanInfoVolumeReport result =
+          new ScanInfoVolumeReport(volume, Arrays.asList(bpList));
       perfTimer.start();
       throttleTimer.start();
       for (String bpid : bpList) {
@@ -623,33 +663,45 @@ public class DirectoryScanner implements Runnable {
         throttleTimer.reset().start();
 
         try {
-          result.put(bpid, volume.compileReport(bpid, report, this));
+          // ScanInfos are added directly to 'report' list
+          volume.compileReport(bpid, report, this);
+          result.addAll(bpid, report);
         } catch (InterruptedException ex) {
           // Exit quickly and flag the scanner to do the same
           result = null;
           break;
         }
       }
+      LOG.trace("Scanner volume report: {}", result);
       return result;
     }
 
     /**
-     * Called by the thread before each potential disk scan so that a pause
-     * can be optionally inserted to limit the number of scans per second.
-     * The limit is controlled by
+     * Called by the thread before each potential disk scan so that a pause can
+     * be optionally inserted to limit the number of scans per second. The limit
+     * is controlled by
      * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
      */
     public void throttle() throws InterruptedException {
       accumulateTimeRunning();
 
-      if ((throttleLimitMsPerSec < 1000) &&
-          (throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
-
-        Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
-        throttleTimer.reset().start();
+      if (throttleLimitMsPerSec > 0L) {
+        final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS);
+        if (runningTime >= throttleLimitMsPerSec) {
+          final long sleepTime;
+          if (runningTime >= 1000L) {
+            LOG.warn("Unable to throttle within the second. Blocking for 1s.");
+            sleepTime = 1000L;
+          } else {
+            // Sleep for the expected time plus any time processing ran over
+            final long overTime = runningTime - throttleLimitMsPerSec;
+            sleepTime = (1000L - throttleLimitMsPerSec) + overTime;
+          }
+          Thread.sleep(sleepTime);
+          throttleTimer.reset().start();
+        }
+        accumulateTimeWaiting();
       }
-
-      accumulateTimeWaiting();
     }
 
     /**
@@ -679,4 +731,5 @@ public class DirectoryScanner implements Runnable {
           || name.startsWith(Block.BLOCK_FILE_PREFIX);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 312bc86..e29a147 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -33,6 +33,7 @@ import java.net.URI;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -44,26 +45,23 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -73,14 +71,17 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Time;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Tests {@link DirectoryScanner} handling of differences
- * between blocks on the disk and block in memory.
+ * Tests {@link DirectoryScanner} handling of differences between blocks on the
+ * disk and block in memory.
  */
 public class TestDirectoryScanner {
   private static final Logger LOG =
@@ -102,7 +103,7 @@ public class TestDirectoryScanner {
     CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
     CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
     CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
-                 getMemlockLimit(Long.MAX_VALUE));
+        getMemlockLimit(Long.MAX_VALUE));
   }
 
   @Before
@@ -110,21 +111,20 @@ public class TestDirectoryScanner {
     LazyPersistTestCase.initCacheManipulator();
   }
 
-  /** create a file with a length of <code>fileLen</code> */
-  private List<LocatedBlock> createFile(String fileNamePrefix,
-                                        long fileLen,
-                                        boolean isLazyPersist) throws IOException {
+  /** create a file with a length of <code>fileLen</code>. */
+  private List<LocatedBlock> createFile(String fileNamePrefix, long fileLen,
+      boolean isLazyPersist) throws IOException {
     FileSystem fs = cluster.getFileSystem();
     Path filePath = new Path("/" + fileNamePrefix + ".dat");
-    DFSTestUtil.createFile(
-        fs, filePath, isLazyPersist, 1024, fileLen,
+    DFSTestUtil.createFile(fs, filePath, isLazyPersist, 1024, fileLen,
         BLOCK_LENGTH, (short) 1, r.nextLong(), false);
-    return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
+    return client.getLocatedBlocks(filePath.toString(), 0, fileLen)
+        .getLocatedBlocks();
   }
 
-  /** Truncate a block file */
+  /** Truncate a block file. */
   private long truncateBlockFile() throws IOException {
-    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+    try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = new File(b.getBlockURI());
         File mf = new File(b.getMetadataURI());
@@ -149,7 +149,7 @@ public class TestDirectoryScanner {
 
   /** Delete a block file */
   private long deleteBlockFile() {
-    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+    try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = new File(b.getBlockURI());
         File mf = new File(b.getMetadataURI());
@@ -165,7 +165,7 @@ public class TestDirectoryScanner {
 
   /** Delete block meta file */
   private long deleteMetaFile() {
-    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+    try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         // Delete a metadata file
         if (b.metadataExists() && b.deleteMetadata()) {
@@ -179,11 +179,12 @@ public class TestDirectoryScanner {
 
   /**
    * Duplicate the given block on all volumes.
+   *
    * @param blockId
    * @throws IOException
    */
   private void duplicateBlock(long blockId) throws IOException {
-    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
+    try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
       ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
       try (FsDatasetSpi.FsVolumeReferences volumes =
           fds.getFsVolumeReferences()) {
@@ -199,16 +200,14 @@ public class TestDirectoryScanner {
           URI destRoot = v.getStorageLocation().getUri();
 
           String relativeBlockPath =
-              sourceRoot.relativize(sourceBlock.toURI())
-                  .getPath();
+              sourceRoot.relativize(sourceBlock.toURI()).getPath();
           String relativeMetaPath =
-              sourceRoot.relativize(sourceMeta.toURI())
-                  .getPath();
+              sourceRoot.relativize(sourceMeta.toURI()).getPath();
 
-          File destBlock = new File(new File(destRoot).toString(),
-              relativeBlockPath);
-          File destMeta = new File(new File(destRoot).toString(),
-              relativeMetaPath);
+          File destBlock =
+              new File(new File(destRoot).toString(), relativeBlockPath);
+          File destMeta =
+              new File(new File(destRoot).toString(), relativeMetaPath);
 
           destBlock.getParentFile().mkdirs();
           FileUtils.copyFile(sourceBlock, destBlock);
@@ -223,7 +222,7 @@ public class TestDirectoryScanner {
     }
   }
 
-  /** Get a random blockId that is not used already */
+  /** Get a random blockId that is not used already. */
   private long getFreeBlockId() {
     long id = rand.nextLong();
     while (true) {
@@ -244,14 +243,15 @@ public class TestDirectoryScanner {
         + Block.METADATA_EXTENSION;
   }
 
-  /** Create a block file in a random volume*/
+  /** Create a block file in a random volume. */
   private long createBlockFile() throws IOException {
     long id = getFreeBlockId();
-    try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
+    try (
+        FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
       int numVolumes = volumes.size();
       int index = rand.nextInt(numVolumes - 1);
-      File finalizedDir = ((FsVolumeImpl) volumes.get(index))
-          .getFinalizedDir(bpid);
+      File finalizedDir =
+          ((FsVolumeImpl) volumes.get(index)).getFinalizedDir(bpid);
       File file = new File(finalizedDir, getBlockFile(id));
       if (file.createNewFile()) {
         LOG.info("Created block file " + file.getName());
@@ -260,14 +260,14 @@ public class TestDirectoryScanner {
     return id;
   }
 
-  /** Create a metafile in a random volume*/
+  /** Create a metafile in a random volume */
   private long createMetaFile() throws IOException {
     long id = getFreeBlockId();
     try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
       int numVolumes = refs.size();
       int index = rand.nextInt(numVolumes - 1);
-      File finalizedDir = ((FsVolumeImpl) refs.get(index))
-          .getFinalizedDir(bpid);
+      File finalizedDir =
+          ((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
       File file = new File(finalizedDir, getMetaFile(id));
       if (file.createNewFile()) {
         LOG.info("Created metafile " + file.getName());
@@ -276,7 +276,7 @@ public class TestDirectoryScanner {
     return id;
   }
 
-  /** Create block file and corresponding metafile in a rondom volume */
+  /** Create block file and corresponding metafile in a rondom volume. */
   private long createBlockMetaFile() throws IOException {
     long id = getFreeBlockId();
 
@@ -318,7 +318,7 @@ public class TestDirectoryScanner {
       long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks)
       throws IOException, InterruptedException, TimeoutException {
     scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
-         missingMemoryBlocks, mismatchBlocks, 0);
+        missingMemoryBlocks, mismatchBlocks, 0);
   }
 
   private void scan(long totalBlocks, int diffsize, long missingMetaFile,
@@ -332,22 +332,22 @@ public class TestDirectoryScanner {
         verifyStats(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
             missingMemoryBlocks, mismatchBlocks, duplicateBlocks);
       } catch (AssertionError ex) {
+        LOG.warn("Assertion Error", ex);
         return false;
       }
 
       return true;
-    }, 50, 2000);
+    }, 100, 2000);
   }
 
   private void verifyStats(long totalBlocks, int diffsize, long missingMetaFile,
       long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks,
       long duplicateBlocks) {
-    assertTrue(scanner.diffs.containsKey(bpid));
-    LinkedList<FsVolumeSpi.ScanInfo> diff = scanner.diffs.get(bpid);
-    assertTrue(scanner.stats.containsKey(bpid));
-    DirectoryScanner.Stats stats = scanner.stats.get(bpid);
-
+    Collection<FsVolumeSpi.ScanInfo> diff = scanner.diffs.getScanInfo(bpid);
     assertEquals(diffsize, diff.size());
+
+    DirectoryScanner.Stats stats = scanner.stats.get(bpid);
+    assertNotNull(stats);
     assertEquals(totalBlocks, stats.totalBlocks);
     assertEquals(missingMetaFile, stats.missingMetaFile);
     assertEquals(missingBlockFile, stats.missingBlockFile);
@@ -356,20 +356,18 @@ public class TestDirectoryScanner {
     assertEquals(duplicateBlocks, stats.duplicateBlocks);
   }
 
-  @Test (timeout=300000)
+  @Test(timeout = 300000)
   public void testRetainBlockOnPersistentStorage() throws Exception {
-    cluster = new MiniDFSCluster
-        .Builder(CONF)
-        .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
-        .numDataNodes(1)
-        .build();
+    cluster = new MiniDFSCluster.Builder(CONF)
+        .storageTypes(
+            new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+        .numDataNodes(1).build();
     try {
       cluster.waitActive();
-      DataNode dataNode = cluster.getDataNodes().get(0);
       bpid = cluster.getNamesystem().getBlockPoolId();
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
       client = cluster.getFileSystem().getClient();
-      scanner = new DirectoryScanner(dataNode, fds, CONF);
+      scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
       FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
 
@@ -397,24 +395,22 @@ public class TestDirectoryScanner {
     }
   }
 
-  @Test (timeout=300000)
+  @Test(timeout = 300000)
   public void testDeleteBlockOnTransientStorage() throws Exception {
-    cluster = new MiniDFSCluster
-        .Builder(CONF)
-        .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
-        .numDataNodes(1)
-        .build();
+    cluster = new MiniDFSCluster.Builder(CONF)
+        .storageTypes(
+            new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+        .numDataNodes(1).build();
     try {
       cluster.waitActive();
       bpid = cluster.getNamesystem().getBlockPoolId();
-      DataNode dataNode = cluster.getDataNodes().get(0);
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
       client = cluster.getFileSystem().getClient();
-      scanner = new DirectoryScanner(dataNode, fds, CONF);
+      scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
       FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
 
-      // Create a file file on RAM_DISK
+      // Create a file on RAM_DISK
       List<LocatedBlock> blocks =
           createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
 
@@ -440,14 +436,14 @@ public class TestDirectoryScanner {
     }
   }
 
-  @Test (timeout=600000)
+  @Test(timeout = 600000)
   public void testDirectoryScanner() throws Exception {
     // Run the test with and without parallel scanning
     for (int parallelism = 1; parallelism < 3; parallelism++) {
       runTest(parallelism);
     }
   }
-  
+
   public void runTest(int parallelism) throws Exception {
     cluster = new MiniDFSCluster.Builder(CONF).build();
     try {
@@ -456,9 +452,9 @@ public class TestDirectoryScanner {
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
       client = cluster.getFileSystem().getClient();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
-                  parallelism);
-      DataNode dataNode = cluster.getDataNodes().get(0);
-      scanner = new DirectoryScanner(dataNode, fds, CONF);
+          parallelism);
+
+      scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks
@@ -492,7 +488,7 @@ public class TestDirectoryScanner {
       // Test5: A metafile exists for which there is no block file and
       // a block in memory
       blockId = createMetaFile();
-      scan(totalBlocks+1, 1, 0, 1, 1, 0);
+      scan(totalBlocks + 1, 1, 0, 1, 1, 0);
       File metafile = new File(getMetaFile(blockId));
       assertTrue(!metafile.exists());
       scan(totalBlocks, 0, 0, 0, 0, 0);
@@ -521,7 +517,7 @@ public class TestDirectoryScanner {
       scan(totalBlocks, 0, 0, 0, 0, 0);
 
       // Test9: create a bunch of blocks files
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         blockId = createBlockFile();
       }
       totalBlocks += 10;
@@ -529,14 +525,14 @@ public class TestDirectoryScanner {
       scan(totalBlocks, 0, 0, 0, 0, 0);
 
       // Test10: create a bunch of metafiles
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         blockId = createMetaFile();
       }
-      scan(totalBlocks+10, 10, 0, 10, 10, 0);
+      scan(totalBlocks + 10, 10, 0, 10, 10, 0);
       scan(totalBlocks, 0, 0, 0, 0, 0);
 
       // Test11: create a bunch block files and meta files
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         blockId = createBlockMetaFile();
       }
       totalBlocks += 10;
@@ -544,7 +540,7 @@ public class TestDirectoryScanner {
       scan(totalBlocks, 0, 0, 0, 0, 0);
 
       // Test12: truncate block files to test block length mismatch
-      for (int i = 0; i < 10 ; i++) {
+      for (int i = 0; i < 10; i++) {
         truncateBlockFile();
       }
       scan(totalBlocks, 10, 0, 0, 0, 10);
@@ -557,9 +553,9 @@ public class TestDirectoryScanner {
       deleteMetaFile();
       deleteBlockFile();
       truncateBlockFile();
-      scan(totalBlocks+3, 6, 2, 2, 3, 2);
-      scan(totalBlocks+1, 0, 0, 0, 0, 0);
-      
+      scan(totalBlocks + 3, 6, 2, 2, 3, 2);
+      scan(totalBlocks + 1, 0, 0, 0, 0, 0);
+
       // Test14: make sure no throttling is happening
       assertTrue("Throttle appears to be engaged",
           scanner.timeWaitingMs.get() < 10L);
@@ -567,10 +563,11 @@ public class TestDirectoryScanner {
           scanner.timeRunningMs.get() > 0L);
 
       // Test15: validate clean shutdown of DirectoryScanner
-      ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
+      //// assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not
+      // sim
       scanner.shutdown();
       assertFalse(scanner.getRunStatus());
-      
+
     } finally {
       if (scanner != null) {
         scanner.shutdown();
@@ -582,17 +579,17 @@ public class TestDirectoryScanner {
 
   /**
    * Test that the timeslice throttle limits the report compiler thread's
-   * execution time correctly.  We test by scanning a large block pool and
+   * execution time correctly. We test by scanning a large block pool and
    * comparing the time spent waiting to the time spent running.
    *
-   * The block pool has to be large, or the ratio will be off.  The throttle
-   * allows the report compiler thread to finish its current cycle when
-   * blocking it, so the ratio will always be a little lower than expected.
-   * The smaller the block pool, the further off the ratio will be.
+   * The block pool has to be large, or the ratio will be off. The throttle
+   * allows the report compiler thread to finish its current cycle when blocking
+   * it, so the ratio will always be a little lower than expected. The smaller
+   * the block pool, the further off the ratio will be.
    *
    * @throws Exception thrown on unexpected failure
    */
-  @Test (timeout=600000)
+  @Test(timeout = 600000)
   public void testThrottling() throws Exception {
     Configuration conf = new Configuration(CONF);
 
@@ -611,10 +608,9 @@ public class TestDirectoryScanner {
       conf.setInt(
           DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
           100);
-      DataNode dataNode = cluster.getDataNodes().get(0);
 
-      final int maxBlocksPerFile = (int) DFSConfigKeys
-          .DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
+      final int maxBlocksPerFile =
+          (int) DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
       int numBlocksToCreate = blocks;
       while (numBlocksToCreate > 0) {
         final int toCreate = Math.min(maxBlocksPerFile, numBlocksToCreate);
@@ -627,7 +623,7 @@ public class TestDirectoryScanner {
       int retries = maxRetries;
 
       while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
-        scanner = new DirectoryScanner(dataNode, fds, conf);
+        scanner = new DirectoryScanner(fds, conf);
         ratio = runThrottleTest(blocks);
         retries -= 1;
       }
@@ -645,7 +641,7 @@ public class TestDirectoryScanner {
       retries = maxRetries;
 
       while ((retries > 0) && ((ratio < 2.75f) || (ratio > 4.5f))) {
-        scanner = new DirectoryScanner(dataNode, fds, conf);
+        scanner = new DirectoryScanner(fds, conf);
         ratio = runThrottleTest(blocks);
         retries -= 1;
       }
@@ -664,7 +660,7 @@ public class TestDirectoryScanner {
       retries = maxRetries;
 
       while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
-        scanner = new DirectoryScanner(dataNode, fds, conf);
+        scanner = new DirectoryScanner(fds, conf);
         ratio = runThrottleTest(blocks);
         retries -= 1;
       }
@@ -675,7 +671,7 @@ public class TestDirectoryScanner {
       assertTrue("Throttle is too permissive", ratio >= 7f);
 
       // Test with no limit
-      scanner = new DirectoryScanner(dataNode, fds, CONF);
+      scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
       scan(blocks, 0, 0, 0, 0, 0);
       scanner.shutdown();
@@ -686,7 +682,7 @@ public class TestDirectoryScanner {
       assertTrue("Report complier threads logged no execution time",
           scanner.timeRunningMs.get() > 0L);
 
-      // Test with a 1ms limit.  This also tests whether the scanner can be
+      // Test with a 1ms limit. This also tests whether the scanner can be
       // shutdown cleanly in mid stride.
       conf.setInt(
           DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
@@ -698,7 +694,7 @@ public class TestDirectoryScanner {
 
       try {
         while ((retries > 0) && (ratio < 10)) {
-          scanner = new DirectoryScanner(dataNode, fds, conf);
+          scanner = new DirectoryScanner(fds, conf);
           scanner.setRetainDiffs(true);
 
           final AtomicLong nowMs = new AtomicLong();
@@ -728,7 +724,7 @@ public class TestDirectoryScanner {
           }
 
           ratio =
-              (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
+              (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
           retries -= 1;
         }
       } finally {
@@ -737,8 +733,7 @@ public class TestDirectoryScanner {
 
       // We just want to test that it waits a lot, but it also runs some
       LOG.info("RATIO: " + ratio);
-      assertTrue("Throttle is too permissive",
-          ratio > 10);
+      assertTrue("Throttle is too permissive", ratio > 8);
       assertTrue("Report complier threads logged no execution time",
           scanner.timeRunningMs.get() > 0L);
 
@@ -746,7 +741,7 @@ public class TestDirectoryScanner {
       conf.setInt(
           DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
           0);
-      scanner = new DirectoryScanner(dataNode, fds, conf);
+      scanner = new DirectoryScanner(fds, conf);
       scanner.setRetainDiffs(true);
       scan(blocks, 0, 0, 0, 0, 0);
       scanner.shutdown();
@@ -761,7 +756,7 @@ public class TestDirectoryScanner {
       conf.setInt(
           DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
           1000);
-      scanner = new DirectoryScanner(dataNode, fds, conf);
+      scanner = new DirectoryScanner(fds, conf);
       scanner.setRetainDiffs(true);
       scan(blocks, 0, 0, 0, 0, 0);
       scanner.shutdown();
@@ -777,9 +772,8 @@ public class TestDirectoryScanner {
       conf.setInt(
           DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
           10);
-      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
-        1);
-      scanner = new DirectoryScanner(dataNode, fds, conf);
+      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
+      scanner = new DirectoryScanner(fds, conf);
       scanner.setRetainDiffs(true);
       scanner.start();
 
@@ -805,7 +799,7 @@ public class TestDirectoryScanner {
     scanner.shutdown();
     assertFalse(scanner.getRunStatus());
 
-    return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
+    return (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
   }
 
   private void verifyAddition(long blockId, long genStamp, long size) {
@@ -836,7 +830,7 @@ public class TestDirectoryScanner {
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }
-  
+
   private void verifyStorageType(long blockId, boolean expectTransient) {
     final ReplicaInfo memBlock;
     memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
@@ -859,7 +853,7 @@ public class TestDirectoryScanner {
     public long getAvailable() throws IOException {
       return 0;
     }
-    
+
     public File getFinalizedDir(String bpid) throws IOException {
       return new File("/base/current/" + bpid + "/finalized");
     }
@@ -898,10 +892,11 @@ public class TestDirectoryScanner {
 
     @Override
     public BlockIterator loadBlockIterator(String bpid, String name)
-          throws IOException {
+        throws IOException {
       throw new UnsupportedOperationException();
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public FsDatasetSpi getDataset() {
       throw new UnsupportedOperationException();
@@ -923,8 +918,8 @@ public class TestDirectoryScanner {
     }
 
     @Override
-    public byte[] loadLastPartialChunkChecksum(
-        File blockFile, File metaFile) throws IOException {
+    public byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
+        throws IOException {
       return null;
     }
 
@@ -945,7 +940,6 @@ public class TestDirectoryScanner {
       return null;
     }
 
-
     @Override
     public VolumeCheckResult check(VolumeCheckContext context)
         throws Exception {
@@ -954,11 +948,11 @@ public class TestDirectoryScanner {
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
-  
+
   private final static String BPID_1 = "BP-783049782-127.0.0.1-1370971773491";
-  
+
   private final static String BPID_2 = "BP-367845636-127.0.0.1-5895645674231";
-      
+
   void testScanInfoObject(long blockId, File blockFile, File metaFile)
       throws Exception {
     FsVolumeSpi.ScanInfo scanInfo =
@@ -978,7 +972,7 @@ public class TestDirectoryScanner {
     }
     assertEquals(TEST_VOLUME, scanInfo.getVolume());
   }
-  
+
   void testScanInfoObject(long blockId) throws Exception {
     FsVolumeSpi.ScanInfo scanInfo =
         new FsVolumeSpi.ScanInfo(blockId, null, null, null);
@@ -987,7 +981,7 @@ public class TestDirectoryScanner {
     assertNull(scanInfo.getMetaFile());
   }
 
-  @Test(timeout=120000)
+  @Test(timeout = 120000)
   public void TestScanInfo() throws Exception {
     testScanInfoObject(123,
         new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
@@ -998,13 +992,10 @@ public class TestDirectoryScanner {
         new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
             "blk_123"),
         null);
-    testScanInfoObject(523,
-        null,
+    testScanInfoObject(523, null,
         new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
             "blk_123__1009.meta"));
-    testScanInfoObject(789,
-        null,
-        null);
+    testScanInfoObject(789, null, null);
     testScanInfoObject(456);
     testScanInfoObject(123,
         new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(),
@@ -1027,7 +1018,6 @@ public class TestDirectoryScanner {
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
       client = cluster.getFileSystem().getClient();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
-      DataNode dataNode = cluster.getDataNodes().get(0);
 
       // Add files with 2 blocks
       createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
@@ -1047,7 +1037,7 @@ public class TestDirectoryScanner {
       FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
       Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
 
-      scanner = new DirectoryScanner(dataNode, spyFds, CONF);
+      scanner = new DirectoryScanner(spyFds, CONF);
       scanner.setRetainDiffs(true);
       scanner.reconcile();
     } finally {
@@ -1061,28 +1051,27 @@ public class TestDirectoryScanner {
 
   @Test
   public void testDirectoryScannerInFederatedCluster() throws Exception {
-    //Create Federated cluster with two nameservices and one DN
+    // Create Federated cluster with two nameservices and one DN
     try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF)
         .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
         .numDataNodes(1).build()) {
       cluster.waitActive();
       cluster.transitionToActive(1);
       cluster.transitionToActive(3);
-      DataNode dataNode = cluster.getDataNodes().get(0);
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
-      //Create one block in first nameservice
+      // Create one block in first nameservice
       FileSystem fs = cluster.getFileSystem(1);
       int bp1Files = 1;
       writeFile(fs, bp1Files);
-      //Create two blocks in second nameservice
+      // Create two blocks in second nameservice
       FileSystem fs2 = cluster.getFileSystem(3);
       int bp2Files = 2;
       writeFile(fs2, bp2Files);
-      //Call the Directory scanner
-      scanner = new DirectoryScanner(dataNode, fds, CONF);
+      // Call the Directory scanner
+      scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
       scanner.reconcile();
-      //Check blocks in corresponding BP
+      // Check blocks in corresponding BP
 
       GenericTestUtils.waitFor(() -> {
         try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
index 422acc3..a48e2f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -34,6 +34,7 @@ import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfoVolumeReport;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedProvidedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -73,9 +75,9 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.TestProvidedReplicaImpl;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
@@ -183,7 +185,6 @@ public class TestProvidedImpl {
   public static class TestFileRegionBlockAliasMap
       extends BlockAliasMap<FileRegion> {
 
-    private Configuration conf;
     private int minId;
     private int numBlocks;
     private Iterator<FileRegion> suppliedIterator;
@@ -592,11 +593,13 @@ public class TestProvidedImpl {
 
   @Test
   public void testScannerWithProvidedVolumes() throws Exception {
-    DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
-    Map<String, FsVolumeSpi.ScanInfo[]> report = scanner.getDiskReport();
+    DirectoryScanner scanner = new DirectoryScanner(dataset, conf);
+    Collection<ScanInfoVolumeReport> reports = scanner.getVolumeReports();
     // no blocks should be reported for the Provided volume as long as
     // the directoryScanner is disabled.
-    assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length);
+    for (ScanInfoVolumeReport report : reports) {
+      assertEquals(0, report.getScanInfo(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1dc0adfa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
index e1c8ae3..db12146 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -63,14 +64,19 @@ public class TestListCorruptFileBlocks {
   @Test (timeout=300000)
   public void testListCorruptFilesCorruptedBlock() throws Exception {
     MiniDFSCluster cluster = null;
-    Random random = new Random();
-    
+
     try {
       Configuration conf = new HdfsConfiguration();
-      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); // datanode scans directories
-      conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000); // datanode sends block reports
+
+      // datanode scans directories
+      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
+
+      // datanode sends block reports
+      conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000);
+
       // Set short retry timeouts so this test runs faster
       conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+
       cluster = new MiniDFSCluster.Builder(conf).build();
       FileSystem fs = cluster.getFileSystem();
 
@@ -84,8 +90,8 @@ public class TestListCorruptFileBlocks {
       final NameNode namenode = cluster.getNameNode();
       Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
         getNamesystem().listCorruptFileBlocks("/", null);
-      assertTrue("Namenode has " + badFiles.size()
-          + " corrupt files. Expecting None.", badFiles.size() == 0);
+      assertEquals("Namenode has " + badFiles.size()
+          + " corrupt files. Expecting None.", 0, badFiles.size());
 
       // Now deliberately corrupt one block
       String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -101,7 +107,7 @@ public class TestListCorruptFileBlocks {
       long position = channel.size() - 2;
       int length = 2;
       byte[] buffer = new byte[length];
-      random.nextBytes(buffer);
+      new Random(13L).nextBytes(buffer);
       channel.write(ByteBuffer.wrap(buffer), position);
       file.close();
       LOG.info("Deliberately corrupting file " + metaFile.getName() +
@@ -134,7 +140,6 @@ public class TestListCorruptFileBlocks {
   @Test (timeout=300000)
   public void testListCorruptFileBlocksInSafeMode() throws Exception {
     MiniDFSCluster cluster = null;
-    Random random = new Random();
 
     try {
       Configuration conf = new HdfsConfiguration();
@@ -164,8 +169,8 @@ public class TestListCorruptFileBlocks {
       // fetch bad file list from namenode. There should be none.
       Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = 
         cluster.getNameNode().getNamesystem().listCorruptFileBlocks("/", null);
-      assertTrue("Namenode has " + badFiles.size()
-          + " corrupt files. Expecting None.", badFiles.size() == 0);
+      assertEquals("Namenode has " + badFiles.size()
+          + " corrupt files. Expecting None.", 0, badFiles.size());
 
       // Now deliberately corrupt one block
       File storageDir = cluster.getInstanceStorageDir(0, 0);
@@ -181,7 +186,7 @@ public class TestListCorruptFileBlocks {
       long position = channel.size() - 2;
       int length = 2;
       byte[] buffer = new byte[length];
-      random.nextBytes(buffer);
+      new Random(13L).nextBytes(buffer);
       channel.write(ByteBuffer.wrap(buffer), position);
       file.close();
       LOG.info("Deliberately corrupting file " + metaFile.getName() +
@@ -318,9 +323,9 @@ public class TestListCorruptFileBlocks {
       }
       // Validate we get all the corrupt files
       LOG.info("Namenode has bad files. " + numCorrupt);
-      assertTrue(numCorrupt == 3);
-      // test the paging here
+      assertEquals(3, numCorrupt);
 
+      // test the paging here
       FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       // now get the 2nd and 3rd file that is corrupt
@@ -331,7 +336,7 @@ public class TestListCorruptFileBlocks {
       FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       numCorrupt = nextCorruptFileBlocks.size();
-      assertTrue(numCorrupt == 2);
+      assertEquals(2, numCorrupt);
       assertTrue(ncfb[0].block.getBlockName()
           .equalsIgnoreCase(cfb[1].block.getBlockName()));
 
@@ -339,14 +344,14 @@ public class TestListCorruptFileBlocks {
         namenode.getNamesystem()
           .listCorruptFileBlocks("/corruptData", cookie);
       numCorrupt = corruptFileBlocks.size();
-      assertTrue(numCorrupt == 0);
+      assertEquals(0, numCorrupt);
       // Do a listing on a dir which doesn't have any corrupt blocks and
       // validate
       util.createFiles(fs, "/goodData");
       corruptFileBlocks = 
         namenode.getNamesystem().listCorruptFileBlocks("/goodData", null);
       numCorrupt = corruptFileBlocks.size();
-      assertTrue(numCorrupt == 0);
+      assertEquals(0, numCorrupt);
       util.cleanup(fs, "/corruptData");
       util.cleanup(fs, "/goodData");
     } finally {
@@ -390,7 +395,7 @@ public class TestListCorruptFileBlocks {
       RemoteIterator<Path> corruptFileBlocks = 
         dfs.listCorruptFileBlocks(new Path("/corruptData"));
       int numCorrupt = countPaths(corruptFileBlocks);
-      assertTrue(numCorrupt == 0);
+      assertEquals(0, numCorrupt);
       // delete the blocks
       String bpid = cluster.getNamesystem().getBlockPoolId();
       // For loop through number of datadirectories per datanode (2)
@@ -426,7 +431,7 @@ public class TestListCorruptFileBlocks {
       }
       // Validate we get all the corrupt files
       LOG.info("Namenode has bad files. " + numCorrupt);
-      assertTrue(numCorrupt == 3);
+      assertEquals(3, numCorrupt);
 
       util.cleanup(fs, "/corruptData");
       util.cleanup(fs, "/goodData");
@@ -465,8 +470,9 @@ public class TestListCorruptFileBlocks {
       final NameNode namenode = cluster.getNameNode();
       Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
         getNamesystem().listCorruptFileBlocks("/srcdat2", null);
-      assertTrue("Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
-          badFiles.size() == 0);
+      assertEquals(
+          "Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
+          0, badFiles.size());
 
       // Now deliberately blocks from all files
       final String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -555,7 +561,7 @@ public class TestListCorruptFileBlocks {
       RemoteIterator<Path> corruptFileBlocks = dfs
           .listCorruptFileBlocks(new Path("corruptData"));
       int numCorrupt = countPaths(corruptFileBlocks);
-      assertTrue(numCorrupt == 0);
+      assertEquals(0, numCorrupt);
 
       // delete the blocks
       String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -589,7 +595,7 @@ public class TestListCorruptFileBlocks {
       }
       // Validate we get all the corrupt files
       LOG.info("Namenode has bad files. " + numCorrupt);
-      assertTrue("Failed to get corrupt files!", numCorrupt == 3);
+      assertEquals("Failed to get corrupt files!", 3, numCorrupt);
 
       util.cleanup(fs, "corruptData");
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message