hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [20/50] [abbrv] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)
Date Mon, 26 Jan 2015 17:44:35 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
deleted file mode 100644
index 9e78c10..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.log4j.Level;
-import org.junit.Test;
-
-/**
- * This test verifies that block verification occurs on the datanode
- */
-public class TestDatanodeBlockScanner {
-  
-  private static final Log LOG = 
-                 LogFactory.getLog(TestDatanodeBlockScanner.class);
-  
-  private static final long TIMEOUT = 20000; // 20 sec.
-  
-  private static final Pattern pattern =
-             Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
-  
-  private static final Pattern pattern_blockVerify =
-             Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
-  
-  static {
-    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
-  }
-  /**
-   * This connects to datanode and fetches block verification data.
-   * It repeats this until the given block has a verification time > newTime.
-   * @param newTime - validation timestamps before newTime are "old", the
-   *            result of previous validations.  This method waits until a "new"
-   *            validation timestamp is obtained.  If no validator runs soon
-   *            enough, the method will time out.
-   * @return - the new validation timestamp
-   * @throws IOException
-   * @throws TimeoutException
-   */
-  private static long waitForVerification(int infoPort, FileSystem fs, 
-                          Path file, int blocksValidated, 
-                          long newTime, long timeout) 
-  throws IOException, TimeoutException {
-    URL url = new URL("http://localhost:" + infoPort +
-                      "/blockScannerReport?listblocks");
-    long lastWarnTime = Time.monotonicNow();
-    if (newTime <= 0) newTime = 1L;
-    long verificationTime = 0;
-    
-    String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
-    long failtime = (timeout <= 0) ? Long.MAX_VALUE 
-        : Time.monotonicNow() + timeout;
-    while (verificationTime < newTime) {
-      if (failtime < Time.monotonicNow()) {
-        throw new TimeoutException("failed to achieve block verification after "
-            + timeout + " msec.  Current verification timestamp = "
-            + verificationTime + ", requested verification time > " 
-            + newTime);
-      }
-      String response = DFSTestUtil.urlGet(url);
-      if(blocksValidated >= 0) {
-        for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) {
-          if (block.equals(matcher.group(1))) {
-            assertEquals(1, blocksValidated);
-            break;
-          }
-        }
-      }
-      for(Matcher matcher = pattern.matcher(response); matcher.find();) {
-        if (block.equals(matcher.group(1))) {
-          verificationTime = Long.parseLong(matcher.group(2));
-          break;
-        }
-      }
-      
-      if (verificationTime < newTime) {
-        long now = Time.monotonicNow();
-        if ((now - lastWarnTime) >= 5*1000) {
-          LOG.info("Waiting for verification of " + block);
-          lastWarnTime = now; 
-        }
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException ignored) {}
-      }
-    }
-    
-    return verificationTime;
-  }
-
-  @Test
-  public void testDatanodeBlockScanner() throws IOException, TimeoutException {
-    long startTime = Time.monotonicNow();
-    
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
-    cluster.waitActive();
-    
-    FileSystem fs = cluster.getFileSystem();
-    Path file1 = new Path("/tmp/testBlockVerification/file1");
-    Path file2 = new Path("/tmp/testBlockVerification/file2");
-    
-    /*
-     * Write the first file and restart the cluster.
-     */
-    DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
-    cluster.shutdown();
-
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(1)
-                                .format(false).build();
-    cluster.waitActive();
-    
-    DFSClient dfsClient =  new DFSClient(new InetSocketAddress("localhost", 
-                                         cluster.getNameNodePort()), conf);
-    fs = cluster.getFileSystem();
-    DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
-    
-    /*
-     * The cluster restarted. The block should be verified by now.
-     */
-    assertTrue(waitForVerification(dn.getInfoPort(), fs, file1, 1, startTime,
-        TIMEOUT) >= startTime);
-    
-    /*
-     * Create a new file and read the block. The block should be marked 
-     * verified since the client reads the block and verifies checksum. 
-     */
-    DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
-    IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), 
-                      conf, true); 
-    assertTrue(waitForVerification(dn.getInfoPort(), fs, file2, 2, startTime,
-        TIMEOUT) >= startTime);
-    
-    cluster.shutdown();
-  }
-
-  @Test
-  public void testBlockCorruptionPolicy() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
-    Random random = new Random();
-    FileSystem fs = null;
-    int rand = random.nextInt(3);
-
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-    Path file1 = new Path("/tmp/testBlockVerification/file1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
-    
-    DFSTestUtil.waitReplication(fs, file1, (short)3);
-    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
-    // Corrupt random replica of block 
-    assertTrue(cluster.corruptReplica(rand, block));
-
-    // Restart the datanode hoping the corrupt block to be reported
-    cluster.restartDataNode(rand);
-
-    // We have 2 good replicas and block is not corrupt
-    DFSTestUtil.waitReplication(fs, file1, (short)2);
-    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-  
-    // Corrupt all replicas. Now, block should be marked as corrupt
-    // and we should get all the replicas 
-    assertTrue(cluster.corruptReplica(0, block));
-    assertTrue(cluster.corruptReplica(1, block));
-    assertTrue(cluster.corruptReplica(2, block));
-
-    // Trigger each of the DNs to scan this block immediately.
-    // The block pool scanner doesn't run frequently enough on its own
-    // to notice these, and due to HDFS-1371, the client won't report
-    // bad blocks to the NN when all replicas are bad.
-    for (DataNode dn : cluster.getDataNodes()) {
-      DataNodeTestUtils.runBlockScannerForBlock(dn, block);
-    }
-
-    // We now have the blocks to be marked as corrupt and we get back all
-    // its replicas
-    DFSTestUtil.waitReplication(fs, file1, (short)3);
-    assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-    cluster.shutdown();
-  }
-  
-  /**
-   * testBlockCorruptionRecoveryPolicy.
-   * This tests recovery of corrupt replicas, first for one corrupt replica
-   * then for two. The test invokes blockCorruptionRecoveryPolicy which
-   * 1. Creates a block with desired number of replicas
-   * 2. Corrupts the desired number of replicas and restarts the datanodes
-   *    containing the corrupt replica. Additionaly we also read the block
-   *    in case restarting does not report corrupt replicas.
-   *    Restarting or reading from the datanode would trigger reportBadBlocks 
-   *    to namenode.
-   *    NameNode adds it to corruptReplicasMap and neededReplication
-   * 3. Test waits until all corrupt replicas are reported, meanwhile
-   *    Re-replciation brings the block back to healthy state
-   * 4. Test again waits until the block is reported with expected number
-   *    of good replicas.
-   */
-  @Test
-  public void testBlockCorruptionRecoveryPolicy1() throws Exception {
-    // Test recovery of 1 corrupt replica
-    LOG.info("Testing corrupt replica recovery for one corrupt replica");
-    blockCorruptionRecoveryPolicy(4, (short)3, 1);
-  }
-
-  @Test
-  public void testBlockCorruptionRecoveryPolicy2() throws Exception {
-    // Test recovery of 2 corrupt replicas
-    LOG.info("Testing corrupt replica recovery for two corrupt replicas");
-    blockCorruptionRecoveryPolicy(5, (short)3, 2);
-  }
-  
-  private void blockCorruptionRecoveryPolicy(int numDataNodes, 
-                                             short numReplicas,
-                                             int numCorruptReplicas) 
-                                             throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L);
-
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-    Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
-    DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
-    final int ITERATIONS = 10;
-
-    // Wait until block is replicated to numReplicas
-    DFSTestUtil.waitReplication(fs, file1, numReplicas);
-
-    for (int k = 0; ; k++) {
-      // Corrupt numCorruptReplicas replicas of block 
-      int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
-      for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
-        if (cluster.corruptReplica(i, block)) {
-          corruptReplicasDNIDs[j++] = i;
-          LOG.info("successfully corrupted block " + block + " on node " 
-                   + i + " " + cluster.getDataNodes().get(i).getDisplayName());
-        }
-      }
-      
-      // Restart the datanodes containing corrupt replicas 
-      // so they would be reported to namenode and re-replicated
-      // They MUST be restarted in reverse order from highest to lowest index,
-      // because the act of restarting them removes them from the ArrayList
-      // and causes the indexes of all nodes above them in the list to change.
-      for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
-        LOG.info("restarting node with corrupt replica: position " 
-            + i + " node " + corruptReplicasDNIDs[i] + " " 
-            + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
-        cluster.restartDataNode(corruptReplicasDNIDs[i]);
-      }
-
-      // Loop until all corrupt replicas are reported
-      try {
-        DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-                                        block, numCorruptReplicas);
-      } catch(TimeoutException e) {
-        if (k > ITERATIONS) {
-          throw e;
-        }
-        LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k);
-        continue;
-      }
-      break;
-    }
-    
-    // Loop until the block recovers after replication
-    DFSTestUtil.waitReplication(fs, file1, numReplicas);
-    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
-    // Make sure the corrupt replica is invalidated and removed from
-    // corruptReplicasMap
-    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-        block, 0);
-    cluster.shutdown();
-  }
-  
-  /** Test if NameNode handles truncated blocks in block report */
-  @Test
-  public void testTruncatedBlockReport() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    final short REPLICATION_FACTOR = (short)2;
-    final Path fileName = new Path("/file1");
-
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-
-    long startTime = Time.monotonicNow();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-                                               .numDataNodes(REPLICATION_FACTOR)
-                                               .build();
-    cluster.waitActive();
-    
-    ExtendedBlock block;
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
-      DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
-      block = DFSTestUtil.getFirstBlock(fs, fileName);
-    } finally {
-      cluster.shutdown();
-    }
-
-    // Restart cluster and confirm block is verified on datanode 0,
-    // then truncate it on datanode 0.
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(REPLICATION_FACTOR)
-                                .format(false)
-                                .build();
-    cluster.waitActive();
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      int infoPort = cluster.getDataNodes().get(0).getInfoPort();
-      assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
-      
-      // Truncate replica of block
-      if (!changeReplicaLength(cluster, block, 0, -1)) {
-        throw new IOException(
-            "failed to find or change length of replica on node 0 "
-            + cluster.getDataNodes().get(0).getDisplayName());
-      }      
-    } finally {
-      cluster.shutdown();
-    }
-
-    // Restart the cluster, add a node, and check that the truncated block is 
-    // handled correctly
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(REPLICATION_FACTOR)
-                                .format(false)
-                                .build();
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();  // now we have 3 datanodes
-
-    // Assure the cluster has left safe mode.
-    cluster.waitClusterUp();
-    assertFalse("failed to leave safe mode", 
-        cluster.getNameNode().isInSafeMode());
-
-    try {
-      // wait for truncated block be detected by block scanner,
-      // and the block to be replicated
-      DFSTestUtil.waitReplication(
-          cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
-      
-      // Make sure that truncated block will be deleted
-      waitForBlockDeleted(cluster, block, 0, TIMEOUT);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  /**
-   * Change the length of a block at datanode dnIndex
-   */
-  static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk,
-      int dnIndex, int lenDelta) throws IOException {
-    File blockFile = cluster.getBlockFile(dnIndex, blk);
-    if (blockFile != null && blockFile.exists()) {
-      RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-      raFile.setLength(raFile.length()+lenDelta);
-      raFile.close();
-      return true;
-    }
-    LOG.info("failed to change length of block " + blk);
-    return false;
-  }
-  
-  private static void waitForBlockDeleted(MiniDFSCluster cluster,
-      ExtendedBlock blk, int dnIndex, long timeout) throws TimeoutException,
-      InterruptedException {
-    File blockFile = cluster.getBlockFile(dnIndex, blk);
-    long failtime = Time.monotonicNow()
-                    + ((timeout > 0) ? timeout : Long.MAX_VALUE);
-    while (blockFile != null && blockFile.exists()) {
-      if (failtime < Time.monotonicNow()) {
-        throw new TimeoutException("waited too long for blocks to be deleted: "
-            + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
-      }
-      Thread.sleep(100);
-      blockFile = cluster.getBlockFile(dnIndex, blk);
-    }
-  }
-  
-  private static final String BASE_PATH = (new File("/data/current/finalized"))
-      .getAbsolutePath();
-  
-  @Test
-  public void testReplicaInfoParsing() throws Exception {
-    testReplicaInfoParsingSingle(BASE_PATH);
-    testReplicaInfoParsingSingle(BASE_PATH + "/subdir1");
-    testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3");
-  }
-  
-  private static void testReplicaInfoParsingSingle(String subDirPath) {
-    File testFile = new File(subDirPath);
-    assertEquals(BASE_PATH, ReplicaInfo.parseBaseDir(testFile).baseDirPath);
-  }
-
-  @Test
-  public void testDuplicateScans() throws Exception {
-    long startTime = Time.monotonicNow();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
-        .numDataNodes(1).build();
-    FileSystem fs = null;
-    try {
-      fs = cluster.getFileSystem();
-      DataNode dataNode = cluster.getDataNodes().get(0);
-      int infoPort = dataNode.getInfoPort();
-      long scanTimeBefore = 0, scanTimeAfter = 0;
-      for (int i = 1; i < 10; i++) {
-        Path fileName = new Path("/test" + i);
-        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
-        waitForVerification(infoPort, fs, fileName, i, startTime, TIMEOUT);
-        if (i > 1) {
-          scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
-              DFSTestUtil.getFirstBlock(fs, new Path("/test" + (i - 1))));
-          assertFalse("scan time shoud not be 0", scanTimeAfter == 0);
-          assertEquals("There should not be duplicate scan", scanTimeBefore,
-              scanTimeAfter);
-        }
-
-        scanTimeBefore = DataNodeTestUtils.getLatestScanTime(dataNode,
-            DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
-      }
-      cluster.restartDataNode(0);
-      Thread.sleep(10000);
-      dataNode = cluster.getDataNodes().get(0);
-      scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
-          DFSTestUtil.getFirstBlock(fs, new Path("/test" + (9))));
-      assertEquals("There should not be duplicate scan", scanTimeBefore,
-          scanTimeAfter);
-    } finally {
-      IOUtils.closeStream(fs);
-      cluster.shutdown();
-    }
-  }
-  
-/**
- * This test verifies whether block is added to the first location of 
- * BlockPoolSliceScanner#blockInfoSet
- */
-  @Test
-  public void testAddBlockInfoToFirstLocation() throws Exception {
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
-        .numDataNodes(1).build();
-    FileSystem fs = null;
-    try {
-      fs = cluster.getFileSystem();
-      DataNode dataNode = cluster.getDataNodes().get(0);
-      // Creating a bunch of blocks
-      for (int i = 1; i < 10; i++) {
-        Path fileName = new Path("/test" + i);
-        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
-      } 
-      // Get block of the first file created (file1)
-      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
-      dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
-      // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
-      // scan the first set of blocks
-      Thread.sleep(10000);
-      Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
-          dataNode, block);
-      // Create another set of blocks
-      for (int i = 10; i < 20; i++) {
-        Path fileName = new Path("/test" + i);
-        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
-      }
-      dataNode.getBlockScanner().addBlock(block, true);
-      // Sleep so that BlockPoolSliceScanner can scan the second set of blocks
-      // and one block which we scheduled to rescan
-      Thread.sleep(10000);
-      // Get the lastScanTime of all of the second set of blocks
-      Set<Long> lastScanTimeSet = new HashSet<Long>();
-      for (int i = 10; i < 20; i++) {
-        long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
-            DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
-        lastScanTimeSet.add(lastScanTime);
-      }
-      Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
-          dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
-      Long minimumLastScanTime = Collections.min(lastScanTimeSet);
-      assertTrue("The second scanTime for test1 block should be greater than "
-         + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
-      assertTrue("The second scanTime for test1 block should be less than or"
-         + " equal to minimum of the lastScanTime of second set of blocks",
-          scanTime2Fortest1Block <= minimumLastScanTime);
-    } finally {
-      IOUtils.closeStream(fs);
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
index b88b5c2..d116f82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
@@ -444,8 +444,7 @@ public class TestReplication {
 
     // Change the length of a replica
     for (int i=0; i<cluster.getDataNodes().size(); i++) {
-      if (TestDatanodeBlockScanner.changeReplicaLength(cluster, block, i,
-          lenDelta)) {
+      if (DFSTestUtil.changeReplicaLength(cluster, block, i, lenDelta)) {
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index f8f476d..2942d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -54,6 +53,7 @@ public class TestOverReplicatedBlocks {
   @Test
   public void testProcesOverReplicateBlock() throws Exception {
     Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
     conf.set(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -71,13 +71,14 @@ public class TestOverReplicatedBlocks {
       assertTrue(cluster.corruptReplica(0, block));
       DataNodeProperties dnProps = cluster.stopDataNode(0);
       // remove block scanner log to trigger block scanning
-      File scanLog = new File(MiniDFSCluster.getFinalizedDir(
+      File scanCursor = new File(new File(MiniDFSCluster.getFinalizedDir(
           cluster.getInstanceStorageDir(0, 0),
-          cluster.getNamesystem().getBlockPoolId()).getParent().toString()
-          + "/../dncp_block_verification.log.prev");
+          cluster.getNamesystem().getBlockPoolId()).getParent()).getParent(),
+          "scanner.cursor");
       //wait for one minute for deletion to succeed;
-      for(int i=0; !scanLog.delete(); i++) {
-        assertTrue("Could not delete log file in one minute", i < 60);
+      for(int i = 0; !scanCursor.delete(); i++) {
+        assertTrue("Could not delete " + scanCursor.getAbsolutePath() +
+            " in one minute", i < 60);
         try {
           Thread.sleep(1000);
         } catch (InterruptedException ignored) {}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index e9557da..68c66a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -82,8 +83,8 @@ public abstract class BlockReportTestBase {
 
   private static short REPL_FACTOR = 1;
   private static final int RAND_LIMIT = 2000;
-  private static final long DN_RESCAN_INTERVAL = 5000;
-  private static final long DN_RESCAN_EXTRA_WAIT = 2 * DN_RESCAN_INTERVAL;
+  private static final long DN_RESCAN_INTERVAL = 1;
+  private static final long DN_RESCAN_EXTRA_WAIT = 3 * DN_RESCAN_INTERVAL;
   private static final int DN_N0 = 0;
   private static final int FILE_START = 0;
 
@@ -294,7 +295,7 @@ public abstract class BlockReportTestBase {
       }
     }
 
-    waitTil(DN_RESCAN_EXTRA_WAIT);
+    waitTil(TimeUnit.SECONDS.toMillis(DN_RESCAN_EXTRA_WAIT));
 
     // all blocks belong to the same file, hence same BP
     String poolId = cluster.getNamesystem().getBlockPoolId();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index f50afd4..fd51e52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -113,30 +113,6 @@ public class DataNodeTestUtils {
     return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
         dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname);
   }
-  
-  public static void runBlockScannerForBlock(DataNode dn, ExtendedBlock b) {
-    BlockPoolSliceScanner bpScanner = getBlockPoolScanner(dn, b);
-    bpScanner.verifyBlock(new ExtendedBlock(b.getBlockPoolId(),
-        new BlockPoolSliceScanner.BlockScanInfo(b.getLocalBlock())));
-  }
-
-  private static BlockPoolSliceScanner getBlockPoolScanner(DataNode dn,
-      ExtendedBlock b) {
-    DataBlockScanner scanner = dn.getBlockScanner();
-    BlockPoolSliceScanner bpScanner = scanner.getBPScanner(b.getBlockPoolId());
-    return bpScanner;
-  }
-
-  public static long getLatestScanTime(DataNode dn, ExtendedBlock b) {
-    BlockPoolSliceScanner scanner = getBlockPoolScanner(dn, b);
-    return scanner.getLastScanTime(b.getLocalBlock());
-  }
-
-  public static void shutdownBlockScanner(DataNode dn) {
-    if (dn.blockScanner != null) {
-      dn.blockScanner.shutdown();
-    }
-  }
 
   /**
    * This method is used for testing. 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 16b6350..0610b94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -484,6 +483,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     @Override
     public void releaseReservedSpace(long bytesToRelease) {
     }
+
+    @Override
+    public BlockIterator newBlockIterator(String bpid, String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BlockIterator loadBlockIterator(String bpid, String name)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FsDatasetSpi getDataset() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private final Map<String, Map<Block, BInfo>> blockMap
@@ -1238,11 +1253,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public RollingLogs createRollingLogs(String bpid, String prefix) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public FsVolumeSpi getVolume(ExtendedBlock b) {
     return volume;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
new file mode 100644
index 0000000..7eaa2bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.Statistics;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBlockScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestBlockScanner.class);
+
+  @Before
+  public void before() {
+    BlockScanner.Conf.allowUnitTestSettings = true;
+    GenericTestUtils.setLogLevel(BlockScanner.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(VolumeScanner.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(FsVolumeImpl.LOG, Level.ALL);
+  }
+
+  private static void disableBlockScanner(Configuration conf) {
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 0L);
+  }
+
+  private static class TestContext implements Closeable {
+    final int numNameServices;
+    final MiniDFSCluster cluster;
+    final DistributedFileSystem[] dfs;
+    final String[] bpids;
+    final DataNode datanode;
+    final BlockScanner blockScanner;
+    final FsDatasetSpi<? extends FsVolumeSpi> data;
+    final List<? extends FsVolumeSpi> volumes;
+
+    TestContext(Configuration conf, int numNameServices) throws Exception {
+      this.numNameServices = numNameServices;
+      MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf).
+          numDataNodes(1).
+          storagesPerDatanode(1);
+      if (numNameServices > 1) {
+        bld.nnTopology(MiniDFSNNTopology.
+              simpleFederatedTopology(numNameServices));
+      }
+      cluster = bld.build();
+      cluster.waitActive();
+      dfs = new DistributedFileSystem[numNameServices];
+      for (int i = 0; i < numNameServices; i++) {
+        dfs[i] = cluster.getFileSystem(i);
+      }
+      bpids = new String[numNameServices];
+      for (int i = 0; i < numNameServices; i++) {
+        bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
+      }
+      datanode = cluster.getDataNodes().get(0);
+      blockScanner = datanode.getBlockScanner();
+      for (int i = 0; i < numNameServices; i++) {
+        dfs[i].mkdirs(new Path("/test"));
+      }
+      data = datanode.getFSDataset();
+      volumes = data.getVolumes();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (cluster != null) {
+        for (int i = 0; i < numNameServices; i++) {
+          dfs[i].delete(new Path("/test"), true);
+        }
+        cluster.shutdown();
+      }
+    }
+
+    public void createFiles(int nsIdx, int numFiles, int length)
+          throws Exception {
+      for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
+        DFSTestUtil.createFile(dfs[nsIdx], getPath(blockIdx), length,
+            (short)1, 123L);
+      }
+    }
+
+    public Path getPath(int fileIdx) {
+      return new Path("/test/" + fileIdx);
+    }
+
+    public ExtendedBlock getFileBlock(int nsIdx, int fileIdx)
+          throws Exception {
+      return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx));
+    }
+  }
+
+  /**
+   * Test iterating through a bunch of blocks in a volume using a volume
+   * iterator.<p/>
+   *
+   * We will rewind the iterator when about halfway through the blocks.
+   *
+   * @param numFiles        The number of files to create.
+   * @param maxStaleness    The maximum staleness to allow with the iterator.
+   * @throws Exception
+   */
+  private void testVolumeIteratorImpl(int numFiles,
+              long maxStaleness) throws Exception {
+    Configuration conf = new Configuration();
+    disableBlockScanner(conf);
+    TestContext ctx = new TestContext(conf, 1);
+    ctx.createFiles(0, numFiles, 1);
+    assertEquals(1, ctx.volumes.size());
+    FsVolumeSpi volume = ctx.volumes.get(0);
+    ExtendedBlock savedBlock = null, loadedBlock = null;
+    boolean testedRewind = false, testedSave = false, testedLoad = false;
+    int blocksProcessed = 0, savedBlocksProcessed = 0;
+    try {
+      BPOfferService bpos[] = ctx.datanode.getAllBpOs();
+      assertEquals(1, bpos.length);
+      BlockIterator iter = volume.newBlockIterator(ctx.bpids[0], "test");
+      assertEquals(ctx.bpids[0], iter.getBlockPoolId());
+      iter.setMaxStalenessMs(maxStaleness);
+      while (true) {
+        HashSet<ExtendedBlock> blocks = new HashSet<ExtendedBlock>();
+        for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
+          blocks.add(ctx.getFileBlock(0, blockIdx));
+        }
+        while (true) {
+          ExtendedBlock block = iter.nextBlock();
+          if (block == null) {
+            break;
+          }
+          blocksProcessed++;
+          LOG.info("BlockIterator for {} found block {}, blocksProcessed = {}",
+              volume, block, blocksProcessed);
+          if (testedSave && (savedBlock == null)) {
+            savedBlock = block;
+          }
+          if (testedLoad && (loadedBlock == null)) {
+            loadedBlock = block;
+            // The block that we get back right after loading the iterator
+            // should be the same block we got back right after saving
+            // the iterator.
+            assertEquals(savedBlock, loadedBlock);
+          }
+          boolean blockRemoved = blocks.remove(block);
+          assertTrue("Found unknown block " + block, blockRemoved);
+          if (blocksProcessed > (numFiles / 3)) {
+            if (!testedSave) {
+              LOG.info("Processed {} blocks out of {}.  Saving iterator.",
+                  blocksProcessed, numFiles);
+              iter.save();
+              testedSave = true;
+              savedBlocksProcessed = blocksProcessed;
+            }
+          }
+          if (blocksProcessed > (numFiles / 2)) {
+            if (!testedRewind) {
+              LOG.info("Processed {} blocks out of {}.  Rewinding iterator.",
+                  blocksProcessed, numFiles);
+              iter.rewind();
+              break;
+            }
+          }
+          if (blocksProcessed > ((2 * numFiles) / 3)) {
+            if (!testedLoad) {
+              LOG.info("Processed {} blocks out of {}.  Loading iterator.",
+                  blocksProcessed, numFiles);
+              iter = volume.loadBlockIterator(ctx.bpids[0], "test");
+              iter.setMaxStalenessMs(maxStaleness);
+              break;
+            }
+          }
+        }
+        if (!testedRewind) {
+          testedRewind = true;
+          blocksProcessed = 0;
+          LOG.info("Starting again at the beginning...");
+          continue;
+        }
+        if (!testedLoad) {
+          testedLoad = true;
+          blocksProcessed = savedBlocksProcessed;
+          LOG.info("Starting again at the load point...");
+          continue;
+        }
+        assertEquals(numFiles, blocksProcessed);
+        break;
+      }
+    } finally {
+      ctx.close();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testVolumeIteratorWithoutCaching() throws Exception {
+    testVolumeIteratorImpl(5, 0);
+  }
+
+  @Test(timeout=60000)
+  public void testVolumeIteratorWithCaching() throws Exception {
+    testVolumeIteratorImpl(600, 100);
+  }
+
+  @Test(timeout=60000)
+  public void testDisableVolumeScanner() throws Exception {
+    Configuration conf = new Configuration();
+    disableBlockScanner(conf);
+    TestContext ctx = new TestContext(conf, 1);
+    try {
+      Assert.assertFalse(ctx.datanode.getBlockScanner().isEnabled());
+    } finally {
+      ctx.close();
+    }
+  }
+
+  public static class TestScanResultHandler extends ScanResultHandler {
+    static class Info {
+      boolean shouldRun = false;
+      final Set<ExtendedBlock> badBlocks = new HashSet<ExtendedBlock>();
+      final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
+      long blocksScanned = 0;
+      Semaphore sem = null;
+    }
+
+    private VolumeScanner scanner;
+
+    final static ConcurrentHashMap<String, Info> infos =
+        new ConcurrentHashMap<String, Info>();
+
+    static Info getInfo(FsVolumeSpi volume) {
+      Info newInfo = new Info();
+      Info prevInfo = infos.
+          putIfAbsent(volume.getStorageID(), newInfo);
+      return prevInfo == null ? newInfo : prevInfo;
+    }
+
+    @Override
+    public void setup(VolumeScanner scanner) {
+      this.scanner = scanner;
+      Info info = getInfo(scanner.volume);
+      LOG.info("about to start scanning.");
+      synchronized (info) {
+        while (!info.shouldRun) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+      LOG.info("starting scanning.");
+    }
+
+    @Override
+    public void handle(ExtendedBlock block, IOException e) {
+      LOG.info("handling block {} (exception {})", block, e);
+      Info info = getInfo(scanner.volume);
+      Semaphore sem;
+      synchronized (info) {
+        sem = info.sem;
+      }
+      if (sem != null) {
+        try {
+          sem.acquire();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException("interrupted");
+        }
+      }
+      synchronized (info) {
+        if (!info.shouldRun) {
+          throw new RuntimeException("stopping volumescanner thread.");
+        }
+        if (e == null) {
+          info.goodBlocks.add(block);
+        } else {
+          info.badBlocks.add(block);
+        }
+        info.blocksScanned++;
+      }
+    }
+  }
+
+  private void testScanAllBlocksImpl(final boolean rescan) throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576L);
+    if (rescan) {
+      conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 100L);
+    } else {
+      conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    }
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 10;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+    final Set<ExtendedBlock> expectedBlocks = new HashSet<ExtendedBlock>();
+    for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
+      expectedBlocks.add(ctx.getFileBlock(0, i));
+    }
+    TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>(){
+      @Override
+      public Boolean get() {
+        TestScanResultHandler.Info info =
+            TestScanResultHandler.getInfo(ctx.volumes.get(0));
+        int numFoundBlocks = 0;
+        StringBuilder foundBlocksBld = new StringBuilder();
+        String prefix = "";
+        synchronized (info) {
+          for (ExtendedBlock block : info.goodBlocks) {
+            assertTrue(expectedBlocks.contains(block));
+            numFoundBlocks++;
+            foundBlocksBld.append(prefix).append(block);
+            prefix = ", ";
+          }
+          LOG.info("numFoundBlocks = {}.  blocksScanned = {}. Found blocks {}",
+              numFoundBlocks, info.blocksScanned, foundBlocksBld.toString());
+          if (rescan) {
+            return (numFoundBlocks == NUM_EXPECTED_BLOCKS) &&
+                     (info.blocksScanned >= 2 * NUM_EXPECTED_BLOCKS);
+          } else {
+            return numFoundBlocks == NUM_EXPECTED_BLOCKS;
+          }
+        }
+      }
+    }, 10, 60000);
+    if (!rescan) {
+      synchronized (info) {
+        assertEquals(NUM_EXPECTED_BLOCKS, info.blocksScanned);
+      }
+      Statistics stats = ctx.blockScanner.getVolumeStats(
+          ctx.volumes.get(0).getStorageID());
+      assertEquals(5 * NUM_EXPECTED_BLOCKS, stats.bytesScannedInPastHour);
+      assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedSinceRestart);
+      assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedInCurrentPeriod);
+      assertEquals(0, stats.scanErrorsSinceRestart);
+      assertEquals(1, stats.scansSinceRestart);
+    }
+    ctx.close();
+  }
+
+  /**
+   * Test scanning all blocks.  Set the scan period high enough that
+   * we shouldn't rescan any block during this test.
+   */
+  @Test(timeout=60000)
+  public void testScanAllBlocksNoRescan() throws Exception {
+    testScanAllBlocksImpl(false);
+  }
+
+  /**
+   * Test scanning all blocks.  Set the scan period high enough that
+   * we should rescan all blocks at least twice during this test.
+   */
+  @Test(timeout=60000)
+  public void testScanAllBlocksWithRescan() throws Exception {
+    testScanAllBlocksImpl(true);
+  }
+
+  /**
+   * Test that we don't scan too many blocks per second.
+   */
+  @Test(timeout=120000)
+  public void testScanRateLimit() throws Exception {
+    Configuration conf = new Configuration();
+    // Limit scan bytes per second dramatically
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 4096L);
+    // Scan continuously
+    conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 1L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 5;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4096);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    long startMs = Time.monotonicNow();
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    Thread.sleep(5000);
+    synchronized (info) {
+      long endMs = Time.monotonicNow();
+      // Should scan no more than one block a second.
+      long maxBlocksScanned = ((endMs + 999 - startMs) / 1000);
+      assertTrue(info.blocksScanned < maxBlocksScanned);
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          return info.blocksScanned > 0;
+        }
+      }
+    }, 1, 30000);
+    ctx.close();
+  }
+
+  @Test(timeout=120000)
+  public void testCorruptBlockHandling() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 5;
+    final int CORRUPT_INDEX = 3;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4);
+    ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX);
+    ctx.cluster.corruptBlockOnDataNodes(badBlock);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          return info.blocksScanned == NUM_EXPECTED_BLOCKS;
+        }
+      }
+    }, 3, 30000);
+    synchronized (info) {
+      assertTrue(info.badBlocks.contains(badBlock));
+      for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
+        if (i != CORRUPT_INDEX) {
+          ExtendedBlock block = ctx.getFileBlock(0, i);
+          assertTrue(info.goodBlocks.contains(block));
+        }
+      }
+    }
+    ctx.close();
+  }
+
+  /**
+   * Test that we save the scan cursor when shutting down the datanode, and
+   * restart scanning from there when the datanode is restarted.
+   */
+  @Test(timeout=120000)
+  public void testDatanodeCursor() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 10;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.sem = new Semaphore(5);
+      info.shouldRun = true;
+      info.notify();
+    }
+    // Scan the first 5 blocks
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          return info.blocksScanned == 5;
+        }
+      }
+    }, 3, 30000);
+    synchronized (info) {
+      assertEquals(5, info.goodBlocks.size());
+      assertEquals(5, info.blocksScanned);
+      info.shouldRun = false;
+    }
+    ctx.datanode.shutdown();
+    String vPath = ctx.volumes.get(0).getBasePath();
+    File cursorPath = new File(new File(new File(vPath, "current"),
+          ctx.bpids[0]), "scanner.cursor");
+    assertTrue("Failed to find cursor save file in " +
+        cursorPath.getAbsolutePath(), cursorPath.exists());
+    Set<ExtendedBlock> prevGoodBlocks = new HashSet<ExtendedBlock>();
+    synchronized (info) {
+      info.sem = new Semaphore(4);
+      prevGoodBlocks.addAll(info.goodBlocks);
+      info.goodBlocks.clear();
+    }
+
+    // The block that we were scanning when we shut down the DN won't get
+    // recorded.
+    // After restarting the datanode, we should scan the next 4 blocks.
+    ctx.cluster.restartDataNode(0);
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned != 9) {
+            LOG.info("Waiting for blocksScanned to reach 9.  It is at {}",
+                info.blocksScanned);
+          }
+          return info.blocksScanned == 9;
+        }
+      }
+    }, 3, 30000);
+    synchronized (info) {
+      assertEquals(4, info.goodBlocks.size());
+      info.goodBlocks.addAll(prevGoodBlocks);
+      assertEquals(9, info.goodBlocks.size());
+      assertEquals(9, info.blocksScanned);
+    }
+    ctx.datanode.shutdown();
+
+    // After restarting the datanode, we should not scan any more blocks.
+    // This is because we reached the end of the block pool earlier, and
+    // the scan period is much, much longer than the test time.
+    synchronized (info) {
+      info.sem = null;
+      info.shouldRun = false;
+      info.goodBlocks.clear();
+    }
+    ctx.cluster.restartDataNode(0);
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    Thread.sleep(3000);
+    synchronized (info) {
+      assertTrue(info.goodBlocks.isEmpty());
+    }
+    ctx.close();
+  }
+
+  @Test(timeout=120000)
+  public void testMultipleBlockPoolScanning() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 3);
+
+    // We scan 5 bytes per file (1 byte in file, 4 bytes of checksum)
+    final int BYTES_SCANNED_PER_FILE = 5;
+    final int NUM_FILES[] = new int[] { 1, 5, 10 };
+    int TOTAL_FILES = 0;
+    for (int i = 0; i < NUM_FILES.length; i++) {
+      TOTAL_FILES += NUM_FILES[i];
+    }
+    ctx.createFiles(0, NUM_FILES[0], 1);
+    ctx.createFiles(0, NUM_FILES[1], 1);
+    ctx.createFiles(0, NUM_FILES[2], 1);
+
+    // start scanning
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+
+    // Wait for all the block pools to be scanned.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          Statistics stats = ctx.blockScanner.getVolumeStats(
+              ctx.volumes.get(0).getStorageID());
+          if (stats.scansSinceRestart < 3) {
+            LOG.info("Waiting for scansSinceRestart to reach 3 (it is {})",
+                stats.scansSinceRestart);
+            return false;
+          }
+          if (!stats.eof) {
+            LOG.info("Waiting for eof.");
+            return false;
+          }
+          return true;
+        }
+      }
+    }, 3, 30000);
+
+    Statistics stats = ctx.blockScanner.getVolumeStats(
+        ctx.volumes.get(0).getStorageID());
+    assertEquals(TOTAL_FILES, stats.blocksScannedSinceRestart);
+    assertEquals(BYTES_SCANNED_PER_FILE * TOTAL_FILES,
+        stats.bytesScannedInPastHour);
+    ctx.close();
+  }
+
+  @Test(timeout=120000)
+  public void testNextSorted() throws Exception {
+    List<String> arr = new LinkedList<String>();
+    arr.add("1");
+    arr.add("3");
+    arr.add("5");
+    arr.add("7");
+    Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "2"));
+    Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "1"));
+    Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, ""));
+    Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, null));
+    Assert.assertEquals(null, FsVolumeImpl.nextSorted(arr, "9"));
+  }
+
+  @Test(timeout=120000)
+  public void testCalculateNeededBytesPerSec() throws Exception {
+    // If we didn't check anything the last hour, we should scan now.
+    Assert.assertTrue(
+        VolumeScanner.calculateShouldScan(100, 0));
+
+    // If, on average, we checked 101 bytes/s checked during the last hour,
+    // stop checking now.
+    Assert.assertFalse(
+        VolumeScanner.calculateShouldScan(100, 101 * 3600));
+
+    // Target is 1 byte / s, but we didn't scan anything in the last minute.
+    // Should scan now.
+    Assert.assertTrue(
+        VolumeScanner.calculateShouldScan(1, 3540));
+
+    // Target is 1000000 byte / s, but we didn't scan anything in the last
+    // minute.  Should scan now.
+    Assert.assertTrue(
+        VolumeScanner.calculateShouldScan(100000L, 354000000L));
+
+    Assert.assertFalse(
+        VolumeScanner.calculateShouldScan(100000L, 365000000L));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 1b8f243..82a1684 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -589,6 +589,22 @@ public class TestDirectoryScanner {
     @Override
     public void releaseReservedSpace(long bytesToRelease) {
     }
+
+    @Override
+    public BlockIterator newBlockIterator(String bpid, String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BlockIterator loadBlockIterator(String bpid, String name)
+          throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FsDatasetSpi getDataset() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
deleted file mode 100644
index 55b1739..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import static org.junit.Assert.fail;
-
-
-public class TestMultipleNNDataBlockScanner {
-  private static final Log LOG = 
-    LogFactory.getLog(TestMultipleNNDataBlockScanner.class);
-  Configuration conf;
-  MiniDFSCluster cluster = null;
-  final String[] bpids = new String[3];
-  final FileSystem[] fs = new FileSystem[3];
-  
-  public void setUp() throws IOException {
-    conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 100);
-    cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
-        .build();
-    for (int i = 0; i < 3; i++) {
-      cluster.waitActive(i);
-    }
-    for (int i = 0; i < 3; i++) {
-      bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
-    }
-    for (int i = 0; i < 3; i++) {
-      fs[i] = cluster.getFileSystem(i);
-    }
-    // Create 2 files on each namenode with 10 blocks each
-    for (int i = 0; i < 3; i++) {
-      DFSTestUtil.createFile(fs[i], new Path("file1"), 1000, (short) 1, 0);
-      DFSTestUtil.createFile(fs[i], new Path("file2"), 1000, (short) 1, 1);
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void testDataBlockScanner() throws IOException, InterruptedException {
-    setUp();
-    try {
-      DataNode dn = cluster.getDataNodes().get(0);
-      for (int i = 0; i < 3; i++) {
-        long blocksScanned = 0;
-        while (blocksScanned != 20) {
-          blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
-          LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i]
-              + "; Scanned so far=" + blocksScanned);
-          Thread.sleep(5000);
-        }
-      }
-
-      StringBuilder buffer = new StringBuilder();
-      dn.blockScanner.printBlockReport(buffer, false);
-      LOG.info("Block Report\n" + buffer.toString());
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void testBlockScannerAfterRefresh() throws IOException,
-      InterruptedException {
-    setUp();
-    try {
-      Configuration dnConf = cluster.getDataNodes().get(0).getConf();
-      Configuration conf = new HdfsConfiguration(dnConf);
-      StringBuilder namenodesBuilder = new StringBuilder();
-
-      String bpidToShutdown = cluster.getNamesystem(2).getBlockPoolId();
-      for (int i = 0; i < 2; i++) {
-        String nsId = DFSUtil.getNamenodeNameServiceId(cluster
-            .getConfiguration(i));
-        namenodesBuilder.append(nsId);
-        namenodesBuilder.append(",");
-      }
-
-      conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder
-          .toString());
-      DataNode dn = cluster.getDataNodes().get(0);
-      dn.refreshNamenodes(conf);
-
-      try {
-        while (true) {
-          dn.blockScanner.getBlocksScannedInLastRun(bpidToShutdown);
-          Thread.sleep(1000);
-        }
-      } catch (IOException ex) {
-        // Expected
-        LOG.info(ex.getMessage());
-      }
-
-      namenodesBuilder.append(DFSUtil.getNamenodeNameServiceId(cluster
-          .getConfiguration(2)));
-      conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder
-          .toString());
-      dn.refreshNamenodes(conf);
-
-      for (int i = 0; i < 3; i++) {
-        long blocksScanned = 0;
-        while (blocksScanned != 20) {
-          blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
-          LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i]
-              + "; Scanned so far=" + blocksScanned);
-          Thread.sleep(5000);
-        }
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void testBlockScannerAfterRestart() throws IOException,
-      InterruptedException {
-    setUp();
-    try {
-      cluster.restartDataNode(0);
-      cluster.waitActive();
-      DataNode dn = cluster.getDataNodes().get(0);
-      for (int i = 0; i < 3; i++) {
-        while (!dn.blockScanner.isInitialized(bpids[i])) {
-          Thread.sleep(1000);
-        }
-        long blocksScanned = 0;
-        while (blocksScanned != 20) {
-          if (dn.blockScanner != null) {
-            blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
-            LOG.info("Waiting for all blocks to be scanned for bpid="
-                + bpids[i] + "; Scanned so far=" + blocksScanned);
-          }
-          Thread.sleep(5000);
-        }
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void test2NNBlockRescanInterval() throws IOException {
-    ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
-    Configuration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
-        .build();
-
-    try {
-      FileSystem fs = cluster.getFileSystem(1);
-      Path file2 = new Path("/test/testBlockScanInterval");
-      DFSTestUtil.createFile(fs, file2, 30, (short) 1, 0);
-
-      fs = cluster.getFileSystem(0);
-      Path file1 = new Path("/test/testBlockScanInterval");
-      DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0);
-      for (int i = 0; i < 8; i++) {
-        LOG.info("Verifying that the blockscanner scans exactly once");
-        waitAndScanBlocks(1, 1);
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * HDFS-3828: DN rescans blocks too frequently
-   * 
-   * @throws Exception
-   */
-  @Test(timeout=120000)
-  public void testBlockRescanInterval() throws IOException {
-    ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
-    Configuration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf).build();
-
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      Path file1 = new Path("/test/testBlockScanInterval");
-      DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0);
-      for (int i = 0; i < 4; i++) {
-        LOG.info("Verifying that the blockscanner scans exactly once");
-        waitAndScanBlocks(1, 1);
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  void waitAndScanBlocks(long scansLastRun, long scansTotal)
-      throws IOException {
-    // DataBlockScanner will run for every 5 seconds so we are checking for
-    // every 5 seconds
-    int n = 5;
-    String bpid = cluster.getNamesystem(0).getBlockPoolId();
-    DataNode dn = cluster.getDataNodes().get(0);
-    long blocksScanned, total;
-    do {
-      try {
-        Thread.sleep(SLEEP_PERIOD_MS);
-      } catch (InterruptedException e) {
-        fail("Interrupted: " + e);
-      }
-      blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpid);
-      total = dn.blockScanner.getTotalScans(bpid);
-      LOG.info("bpid = " + bpid + " blocksScanned = " + blocksScanned + " total=" + total);
-    } while (n-- > 0 && (blocksScanned != scansLastRun || scansTotal != total));
-    Assert.assertEquals(scansTotal, total);
-    Assert.assertEquals(scansLastRun, blocksScanned);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index f256ee6..8fd51d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -51,12 +50,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
       StorageType.DEFAULT);
 
   @Override
-  public RollingLogs createRollingLogs(String bpid, String prefix)
-      throws IOException {
-    return new ExternalRollingLogs();
-  }
-
-  @Override
   public List<ExternalVolumeImpl> getVolumes() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
deleted file mode 100644
index c9fb7c8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.extdataset;
-
-import java.io.IOException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-public class ExternalRollingLogs implements RollingLogs {
-
-  private class ExternalLineIterator implements LineIterator {
-    @Override
-    public boolean isPrevious() {
-      return false;
-    }
-
-    @Override
-    public boolean isLastReadFromPrevious() {
-      return false;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return false;
-    }
-
-    @Override
-    public String next() {
-      return null;
-    }
-
-    @Override
-    public void remove() {
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-
-  private class ExternalAppender implements Appender {
-    @Override
-    public Appendable append(CharSequence cs) throws IOException {
-      return null;
-    }
-
-    @Override
-    public Appendable append(CharSequence cs, int i, int i1)
-	throws IOException {
-      return null;
-    }
-
-    @Override
-    public Appendable append(char c) throws IOException {
-      return null;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-
-  @Override
-  public LineIterator iterator(boolean skipPrevious) throws IOException {
-    return new ExternalLineIterator();
-  }
-
-  @Override
-  public Appender appender() {
-    return new ExternalAppender();
-  }
-
-  @Override
-  public boolean roll() throws IOException {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 857e946..0ea33bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 
@@ -79,4 +80,20 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
   @Override
   public void releaseReservedSpace(long bytesToRelease) {
   }
+
+  @Override
+  public BlockIterator newBlockIterator(String bpid, String name) {
+    return null;
+  }
+
+  @Override
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public FsDatasetSpi getDataset() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
index 791bb76..82a6951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.junit.Test;
 
 /**
@@ -80,14 +79,6 @@ public class TestExternalDataset {
   }
 
   /**
-   * Tests instantiating a RollingLogs subclass.
-   */
-  @Test
-  public void testInstantiateRollingLogs() throws Throwable {
-    RollingLogs inst = new ExternalRollingLogs();
-  }
-
-  /**
    * Tests instantiating an FsVolumeSpi subclass.
    */
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
index f92d949..691d390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
@@ -41,16 +43,21 @@ public class FsVolumeListTest {
       new RoundRobinVolumeChoosingPolicy<>();
   private FsDatasetImpl dataset = null;
   private String baseDir;
+  private BlockScanner blockScanner;
 
   @Before
   public void setUp() {
     dataset = mock(FsDatasetImpl.class);
     baseDir = new FileSystemTestHelper().getTestRootDir();
+    Configuration blockScannerConf = new Configuration();
+    blockScannerConf.setInt(DFSConfigKeys.
+        DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    blockScanner = new BlockScanner(null, blockScannerConf);
   }
 
   @Test
   public void testGetNextVolumeWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
     List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "nextvolume-" + i);
@@ -59,7 +66,7 @@ public class FsVolumeListTest {
           conf, StorageType.DEFAULT);
       volume.setCapacityForTesting(1024 * 1024 * 1024);
       volumes.add(volume);
-      volumeList.addVolume(volume);
+      volumeList.addVolume(volume.obtainReference());
     }
 
     // Close the second volume.
@@ -75,7 +82,7 @@ public class FsVolumeListTest {
 
   @Test
   public void testCheckDirsWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
     List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "volume-" + i);
@@ -83,7 +90,7 @@ public class FsVolumeListTest {
       FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
           conf, StorageType.DEFAULT);
       volumes.add(volume);
-      volumeList.addVolume(volume);
+      volumeList.addVolume(volume.obtainReference());
     }
 
     // Close the 2nd volume.
@@ -91,4 +98,4 @@ public class FsVolumeListTest {
     // checkDirs() should ignore the 2nd volume since it is closed.
     volumeList.checkDirs();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index ca936b3..f3d15de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -22,17 +22,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -51,19 +51,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -81,7 +79,6 @@ public class TestFsDatasetImpl {
   private Configuration conf;
   private DataNode datanode;
   private DataStorage storage;
-  private DataBlockScanner scanner;
   private FsDatasetImpl dataset;
 
   private static Storage.StorageDirectory createStorageDirectory(File root) {
@@ -112,13 +109,14 @@ public class TestFsDatasetImpl {
   public void setUp() throws IOException {
     datanode = mock(DataNode.class);
     storage = mock(DataStorage.class);
-    scanner = mock(DataBlockScanner.class);
     this.conf = new Configuration();
+    this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
     final DNConf dnConf = new DNConf(conf);
 
     when(datanode.getConf()).thenReturn(conf);
     when(datanode.getDnConf()).thenReturn(dnConf);
-    when(datanode.getBlockScanner()).thenReturn(scanner);
+    final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+    when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
 
     createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
     dataset = new FsDatasetImpl(datanode, storage, conf);
@@ -208,10 +206,6 @@ public class TestFsDatasetImpl {
     assertEquals("The replica infos on this volume has been removed from the "
                  + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
                  totalNumReplicas);
-
-    // Verify that every BlockPool deletes the removed blocks from the volume.
-    verify(scanner, times(BLOCK_POOL_IDS.length))
-        .deleteBlocks(anyString(), any(Block[].class));
   }
 
   @Test(timeout = 5000)
@@ -245,7 +239,9 @@ public class TestFsDatasetImpl {
   public void testChangeVolumeWithRunningCheckDirs() throws IOException {
     RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
         new RoundRobinVolumeChoosingPolicy<>();
-    final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    final BlockScanner blockScanner = new BlockScanner(datanode, conf);
+    final FsVolumeList volumeList =
+        new FsVolumeList(0, blockScanner, blockChooser);
     final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
 
     // Initialize FsVolumeList with 5 mock volumes.
@@ -254,19 +250,23 @@ public class TestFsDatasetImpl {
       FsVolumeImpl volume = mock(FsVolumeImpl.class);
       oldVolumes.add(volume);
       when(volume.getBasePath()).thenReturn("data" + i);
-      volumeList.addVolume(volume);
+      FsVolumeReference ref = mock(FsVolumeReference.class);
+      when(ref.getVolume()).thenReturn(volume);
+      volumeList.addVolume(ref);
     }
 
     // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
     // volume and add another volume. It does not affect checkDirs() running.
     final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
+    final FsVolumeReference newRef = mock(FsVolumeReference.class);
+    when(newRef.getVolume()).thenReturn(newVolume);
     FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
     doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocationOnMock)
           throws Throwable {
         volumeList.removeVolume(new File("data4"));
-        volumeList.addVolume(newVolume);
+        volumeList.addVolume(newRef);
         return null;
       }
     }).when(blockedVolume).checkDirs();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
index 3609684..6cc3d7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
@@ -184,8 +184,8 @@ public class TestInterDatanodeProtocol {
       InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
           datanode, datanodeinfo[0], conf, useDnHostname);
       
-      //stop block scanner, so we could compare lastScanTime
-      DataNodeTestUtils.shutdownBlockScanner(datanode);
+      // Stop the block scanners.
+      datanode.getBlockScanner().removeAllVolumeScanners();
 
       //verify BlockMetaDataInfo
       ExtendedBlock b = locatedblock.getBlock();


Mime
View raw message