hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject [1/3] HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.
Date Mon, 27 Oct 2014 19:59:19 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 6724c2f7e -> e8d77593f


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
new file mode 100644
index 0000000..c762849
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public abstract class LazyPersistTestCase {
+
+  static {
+    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
+  protected static final int BUFFER_LENGTH = 4096;
+  protected static final int EVICTION_LOW_WATERMARK = 1;
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+  private static final String JMX_SERVICE_NAME = "DataNode";
+  protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+  protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
+  protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
+  protected static final short REPL_FACTOR = 1;
+
+  protected MiniDFSCluster cluster;
+  protected DistributedFileSystem fs;
+  protected DFSClient client;
+  protected JMXGet jmx;
+  protected TemporarySocketDirectory sockDir;
+
+  @After
+  public void shutDownCluster() throws Exception {
+
+    // Dump all RamDisk JMX metrics before shutdown the cluster
+    printRamDiskJMXMetrics();
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+      client = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+      cluster = null;
+    }
+
+    if (jmx != null) {
+      jmx = null;
+    }
+
+    IOUtils.closeQuietly(sockDir);
+    sockDir = null;
+  }
+
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  protected final LocatedBlocks ensureFileReplicasOnStorageType(
+      Path path, StorageType storageType) throws IOException {
+    // Ensure that returned block locations returned are correct!
+    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+    assertThat(fs.exists(path), is(true));
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+        client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+    }
+    return locatedBlocks;
+  }
+
+  protected final void makeRandomTestFile(Path path, long length,
+      boolean isLazyPersist, long seed) throws IOException {
+    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  }
+
+  protected final void makeTestFile(Path path, long length,
+      boolean isLazyPersist) throws IOException {
+
+    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+    if (isLazyPersist) {
+      createFlags.add(LAZY_PERSIST);
+    }
+
+    FSDataOutputStream fos = null;
+    try {
+      fos =
+          fs.create(path,
+              FsPermission.getFileDefault(),
+              createFlags,
+              BUFFER_LENGTH,
+              REPL_FACTOR,
+              BLOCK_SIZE,
+              null);
+
+      // Allocate a block.
+      byte[] buffer = new byte[BUFFER_LENGTH];
+      for (int bytesWritten = 0; bytesWritten < length; ) {
+        fos.write(buffer, 0, buffer.length);
+        bytesWritten += buffer.length;
+      }
+      if (length > 0) {
+        fos.hsync();
+      }
+    } finally {
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
+   */
+  protected final void startUpCluster(boolean hasTransientStorage,
+                                      final int ramDiskReplicaCapacity,
+                                      final boolean useSCR,
+                                      final boolean useLegacyBlockReaderLocal)
+      throws IOException {
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+                LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+                HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+                LAZY_WRITER_INTERVAL_SEC);
+    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
+                EVICTION_LOW_WATERMARK * BLOCK_SIZE);
+
+    if (useSCR) {
+      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+      // Do not share a client context across tests.
+      conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
+      if (useLegacyBlockReaderLocal) {
+        conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+        conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+            UserGroupInformation.getCurrentUser().getShortUserName());
+      } else {
+        sockDir = new TemporarySocketDirectory();
+        conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+            this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
+      }
+    }
+
+    long[] capacities = null;
+    if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+      // Convert replica count to byte count, add some delta for .meta and
+      // VERSION files.
+      long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
+          (BLOCK_SIZE - 1);
+      capacities = new long[] { ramDiskStorageLimit, -1 };
+    }
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(REPL_FACTOR)
+        .storageCapacities(capacities)
+        .storageTypes(hasTransientStorage ?
+            new StorageType[]{ RAM_DISK, DEFAULT } : null)
+        .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+    try {
+      jmx = initJMX();
+    } catch (Exception e) {
+      fail("Failed initialize JMX for testing: " + e);
+    }
+    LOG.info("Cluster startup complete");
+  }
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
+   */
+  protected final void startUpCluster(final int numDataNodes,
+                                      final StorageType[] storageTypes,
+                                      final long ramDiskStorageLimit,
+                                      final boolean useSCR)
+    throws IOException {
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+      LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+      HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+      LAZY_WRITER_INTERVAL_SEC);
+
+    if (useSCR)
+    {
+      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+      conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
+      sockDir = new TemporarySocketDirectory();
+      conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+          this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
+      conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    }
+
+    cluster = new MiniDFSCluster
+      .Builder(conf)
+      .numDataNodes(numDataNodes)
+      .storageTypes(storageTypes != null ?
+          storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+      .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+
+    // Artificially cap the storage capacity of the RAM_DISK volume.
+    if (ramDiskStorageLimit >= 0) {
+      List<? extends FsVolumeSpi> volumes =
+        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+      for (FsVolumeSpi volume : volumes) {
+        if (volume.getStorageType() == RAM_DISK) {
+          ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+        }
+      }
+    }
+
+    LOG.info("Cluster startup complete");
+  }
+
+  protected final void startUpCluster(boolean hasTransientStorage,
+                                      final int ramDiskReplicaCapacity)
+      throws IOException {
+    startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
+  }
+
+  protected final void triggerBlockReport()
+      throws IOException, InterruptedException {
+    // Trigger block report to NN
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    Thread.sleep(10 * 1000);
+  }
+
+  protected final boolean verifyBlockDeletedFromDir(File dir,
+      LocatedBlocks locatedBlocks) {
+
+    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+      File targetDir =
+        DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+      File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+      if (blockFile.exists()) {
+        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+      File metaFile = new File(targetDir,
+        DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+          lb.getBlock().getGenerationStamp()));
+      if (metaFile.exists()) {
+        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+      throws IOException, InterruptedException {
+
+    LOG.info("Verifying replica has no saved copy after deletion.");
+    triggerBlockReport();
+
+    while(
+      DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+        > 0L){
+      Thread.sleep(1000);
+    }
+
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<? extends FsVolumeSpi> volumes =
+      cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+    // Make sure deleted replica does not have a copy on either finalized dir of
+    // transient volume or finalized dir of non-transient volume
+    for (FsVolumeSpi v : volumes) {
+      FsVolumeImpl volume = (FsVolumeImpl) v;
+      File targetDir = (v.isTransientStorage()) ?
+          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+          volume.getBlockPoolSlice(bpid).getLazypersistDir();
+      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected final void verifyRamDiskJMXMetric(String metricName,
+      long expectedValue) throws Exception {
+    assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+  }
+
+  protected final boolean verifyReadRandomFile(
+      Path path, int fileLength, int seed) throws IOException {
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+    byte expected[] = DFSTestUtil.
+      calculateFileContentsFromSeed(seed, fileLength);
+    return Arrays.equals(contents, expected);
+  }
+
+  private JMXGet initJMX() throws Exception {
+    JMXGet jmx = new JMXGet();
+    jmx.setService(JMX_SERVICE_NAME);
+    jmx.init();
+    return jmx;
+  }
+
+  private void printRamDiskJMXMetrics() {
+    try {
+      if (jmx != null) {
+        jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 444afed..771609c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -17,103 +17,45 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.tools.JMXGet;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import static org.apache.hadoop.fs.CreateFlag.CREATE;
-import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
 import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TestLazyPersistFiles {
-  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
-
-  static {
-    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
-  }
-
+public class TestLazyPersistFiles extends LazyPersistTestCase {
   private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
 
   private static final int THREADPOOL_SIZE = 10;
 
-  private static final short REPL_FACTOR = 1;
-  private static final int BLOCK_SIZE = 5 * 1024 * 1024;
-  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
-  private static final long HEARTBEAT_INTERVAL_SEC = 1;
-  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
-  private static final int BUFFER_LENGTH = 4096;
-  private static final int EVICTION_LOW_WATERMARK = 1;
-  private static final String JMX_SERVICE_NAME = "DataNode";
-  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
-
-  private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
-  private DFSClient client;
-  private Configuration conf;
-  private JMXGet jmx;
-
-  @After
-  public void shutDownCluster() throws Exception {
-
-    // Dump all RamDisk JMX metrics before shutdown the cluster
-    printRamDiskJMXMetrics();
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-      client = null;
-    }
-
-    if (cluster != null) {
-      cluster.shutdownDataNodes();
-      cluster.shutdown();
-      cluster = null;
-    }
-
-    if (jmx != null) {
-      jmx = null;
-    }
-  }
-
-  @Test (timeout=300000)
+  @Test
   public void testPolicyNotSetByDefault() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -126,7 +68,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPolicyPropagation() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -138,7 +80,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPolicyPersistenceInEditLog() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -152,7 +94,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPolicyPersistenceInFsImage() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -170,7 +112,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPlacementOnRamDisk() throws IOException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -180,7 +122,7 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path, RAM_DISK);
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPlacementOnSizeLimitedRamDisk() throws IOException {
     startUpCluster(true, 3);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -199,7 +141,7 @@ public class TestLazyPersistFiles {
    * Write should default to disk. No error.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testFallbackToDisk() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -213,7 +155,7 @@ public class TestLazyPersistFiles {
    * File can not fit in RamDisk even with eviction
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testFallbackToDiskFull() throws Exception {
     startUpCluster(false, 0);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -231,7 +173,7 @@ public class TestLazyPersistFiles {
    * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testFallbackToDiskPartial()
     throws IOException, InterruptedException {
     startUpCluster(true, 2);
@@ -271,7 +213,7 @@ public class TestLazyPersistFiles {
    *
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskNotChosenByDefault() throws IOException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -289,7 +231,7 @@ public class TestLazyPersistFiles {
    * Append to lazy persist file is denied.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testAppendIsDenied() throws IOException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -310,7 +252,7 @@ public class TestLazyPersistFiles {
    * must be discarded by the NN, instead of being kept around as a
    * 'corrupt' file.
    */
-  @Test (timeout=300000)
+  @Test
   public void testLazyPersistFilesAreDiscarded()
       throws IOException, InterruptedException {
     startUpCluster(true, 2);
@@ -344,7 +286,7 @@ public class TestLazyPersistFiles {
                is(0L));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testLazyPersistBlocksAreSaved()
       throws IOException, InterruptedException {
     startUpCluster(true, -1);
@@ -399,7 +341,7 @@ public class TestLazyPersistFiles {
    * RamDisk eviction after lazy persist to disk.
    * @throws Exception
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskEviction() throws Exception {
     startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -434,7 +376,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskEvictionBeforePersist()
     throws IOException, InterruptedException {
     startUpCluster(true, 1);
@@ -459,7 +401,7 @@ public class TestLazyPersistFiles {
 
     assert(fs.exists(path1));
     assert(fs.exists(path2));
-    verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
   }
 
   /**
@@ -467,7 +409,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskEvictionIsLru()
     throws Exception {
     final int NUM_PATHS = 5;
@@ -529,7 +471,7 @@ public class TestLazyPersistFiles {
    * Memory is freed up and file is gone.
    * @throws IOException
    */
-  @Test // (timeout=300000)
+  @Test
   public void testDeleteBeforePersist()
     throws Exception {
     startUpCluster(true, -1);
@@ -556,7 +498,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testDeleteAfterPersist()
     throws Exception {
     startUpCluster(true, -1);
@@ -584,7 +526,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testDfsUsageCreateDelete()
     throws IOException, InterruptedException {
     startUpCluster(true, 4);
@@ -615,7 +557,7 @@ public class TestLazyPersistFiles {
   /**
    * Concurrent read from the same node and verify the contents.
    */
-  @Test (timeout=300000)
+  @Test
   public void testConcurrentRead()
     throws Exception {
     startUpCluster(true, 2);
@@ -666,7 +608,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testConcurrentWrites()
     throws IOException, InterruptedException {
     startUpCluster(true, 9);
@@ -702,7 +644,7 @@ public class TestLazyPersistFiles {
     assertThat(testFailed.get(), is(false));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testDnRestartWithSavedReplicas()
       throws IOException, InterruptedException {
 
@@ -726,7 +668,7 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testDnRestartWithUnsavedReplicas()
       throws IOException, InterruptedException {
 
@@ -746,183 +688,6 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
   }
 
-  // ---- Utility functions for all test cases -------------------------------
-
-  /**
-   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
-   * capped. If ramDiskStorageLimit < 0 then it is ignored.
-   */
-  private void startUpCluster(boolean hasTransientStorage,
-                              final int ramDiskReplicaCapacity,
-                              final boolean useSCR)
-      throws IOException {
-
-    conf = new Configuration();
-    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
-                LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
-    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
-    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-                HEARTBEAT_RECHECK_INTERVAL_MSEC);
-    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-                LAZY_WRITER_INTERVAL_SEC);
-    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
-                EVICTION_LOW_WATERMARK * BLOCK_SIZE);
-
-    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
-
-    long[] capacities = null;
-    if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
-      // Convert replica count to byte count, add some delta for .meta and VERSION files.
-      long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
-      capacities = new long[] { ramDiskStorageLimit, -1 };
-    }
-
-    cluster = new MiniDFSCluster
-        .Builder(conf)
-        .numDataNodes(REPL_FACTOR)
-        .storageCapacities(capacities)
-        .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
-        .build();
-    fs = cluster.getFileSystem();
-    client = fs.getClient();
-    try {
-      jmx = initJMX();
-    } catch (Exception e) {
-      fail("Failed initialize JMX for testing: " + e);
-    }
-    LOG.info("Cluster startup complete");
-  }
-
-  private void startUpCluster(boolean hasTransientStorage,
-                              final int ramDiskReplicaCapacity)
-    throws IOException {
-    startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
-  }
-
-  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
-      throws IOException {
-
-    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
-
-    if (isLazyPersist) {
-      createFlags.add(LAZY_PERSIST);
-    }
-
-    FSDataOutputStream fos = null;
-    try {
-      fos =
-          fs.create(path,
-              FsPermission.getFileDefault(),
-              createFlags,
-              BUFFER_LENGTH,
-              REPL_FACTOR,
-              BLOCK_SIZE,
-              null);
-
-      // Allocate a block.
-      byte[] buffer = new byte[BUFFER_LENGTH];
-      for (int bytesWritten = 0; bytesWritten < length; ) {
-        fos.write(buffer, 0, buffer.length);
-        bytesWritten += buffer.length;
-      }
-      if (length > 0) {
-        fos.hsync();
-      }
-    } finally {
-      IOUtils.closeQuietly(fos);
-    }
-  }
-
-  private LocatedBlocks ensureFileReplicasOnStorageType(
-      Path path, StorageType storageType) throws IOException {
-    // Ensure that returned block locations returned are correct!
-    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
-    assertThat(fs.exists(path), is(true));
-    long fileLength = client.getFileInfo(path.toString()).getLen();
-    LocatedBlocks locatedBlocks =
-        client.getLocatedBlocks(path.toString(), 0, fileLength);
-    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
-      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
-    }
-    return locatedBlocks;
-  }
-
-  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
-                                  long seed) throws IOException {
-    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
-      BLOCK_SIZE, REPL_FACTOR, seed, true);
-  }
-
-  private boolean verifyReadRandomFile(
-    Path path, int fileLength, int seed) throws IOException {
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
-    byte expected[] = DFSTestUtil.
-      calculateFileContentsFromSeed(seed, fileLength);
-    return Arrays.equals(contents, expected);
-  }
-
-  private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
-    throws IOException, InterruptedException {
-
-    LOG.info("Verifying replica has no saved copy after deletion.");
-    triggerBlockReport();
-
-    while(
-      DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
-        > 0L){
-      Thread.sleep(1000);
-    }
-
-    final String bpid = cluster.getNamesystem().getBlockPoolId();
-    List<? extends FsVolumeSpi> volumes =
-      cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
-    // Make sure deleted replica does not have a copy on either finalized dir of
-    // transient volume or finalized dir of non-transient volume
-    for (FsVolumeSpi v : volumes) {
-      FsVolumeImpl volume = (FsVolumeImpl) v;
-      File targetDir = (v.isTransientStorage()) ?
-          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
-          volume.getBlockPoolSlice(bpid).getLazypersistDir();
-      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
-
-    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-      File targetDir =
-        DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
-
-      File blockFile = new File(targetDir, lb.getBlock().getBlockName());
-      if (blockFile.exists()) {
-        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
-          " exists after deletion.");
-        return false;
-      }
-      File metaFile = new File(targetDir,
-        DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
-          lb.getBlock().getGenerationStamp()));
-      if (metaFile.exists()) {
-        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
-          " exists after deletion.");
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private void triggerBlockReport()
-    throws IOException, InterruptedException {
-    // Trigger block report to NN
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
-  }
-
   class WriterRunnable implements Runnable {
     private final int id;
     private final Path paths[];
@@ -960,27 +725,4 @@ public class TestLazyPersistFiles {
       }
     }
   }
-
-  JMXGet initJMX() throws Exception
-  {
-    JMXGet jmx = new JMXGet();
-    jmx.setService(JMX_SERVICE_NAME);
-    jmx.init();
-    return jmx;
-  }
-
-  void printRamDiskJMXMetrics() {
-    try {
-      if (jmx != null) {
-        jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  void verifyRamDiskJMXMetric(String metricName, long expectedValue)
-      throws Exception {
-    assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
index b6ac287..efc6dcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -15,84 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-  package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-  import org.apache.commons.io.IOUtils;
-  import org.apache.commons.logging.Log;
-  import org.apache.commons.logging.LogFactory;
-  import org.apache.commons.logging.impl.Log4JLogger;
-  import org.apache.hadoop.conf.Configuration;
-  import org.apache.hadoop.fs.CreateFlag;
-  import org.apache.hadoop.fs.FSDataInputStream;
-  import org.apache.hadoop.fs.FSDataOutputStream;
-  import org.apache.hadoop.fs.Path;
-  import org.apache.hadoop.fs.permission.FsPermission;
-  import org.apache.hadoop.hdfs.*;
-  import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-  import org.apache.hadoop.hdfs.server.datanode.DataNode;
-  import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-  import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-  import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-  import org.apache.hadoop.hdfs.server.namenode.NameNode;
-  import org.apache.hadoop.net.unix.DomainSocket;
-  import org.apache.hadoop.net.unix.TemporarySocketDirectory;
-  import org.apache.hadoop.security.UserGroupInformation;
-  import org.apache.hadoop.test.GenericTestUtils;
-  import org.apache.hadoop.util.NativeCodeLoader;
-  import org.apache.log4j.Level;
-  import org.junit.*;
-
-  import java.io.File;
-  import java.io.IOException;
-  import java.util.Arrays;
-  import java.util.EnumSet;
-  import java.util.List;
-  import java.util.UUID;
-
-  import static org.apache.hadoop.fs.CreateFlag.CREATE;
-  import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
-  import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-  import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
-  import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
-  import static org.hamcrest.CoreMatchers.equalTo;
-  import static org.hamcrest.core.Is.is;
-  import static org.junit.Assert.assertThat;
-
-public class TestScrLazyPersistFiles {
-  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
-
-  static {
-    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
-  }
-
-  private static short REPL_FACTOR = 1;
-  private static final int BLOCK_SIZE = 10485760;   // 10 MB
-  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
-  private static final long HEARTBEAT_INTERVAL_SEC = 1;
-  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
-  private static final int BUFFER_LENGTH = 4096;
-  private static TemporarySocketDirectory sockDir;
-
-  private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
-  private DFSClient client;
-  private Configuration conf;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class TestScrLazyPersistFiles extends LazyPersistTestCase {
 
   @BeforeClass
   public static void init() {
-    sockDir = new TemporarySocketDirectory();
     DomainSocket.disableBindPathValidation();
   }
 
-  @AfterClass
-  public static void shutdown() throws IOException {
-    sockDir.close();
-  }
-
   @Before
   public void before() {
     Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
@@ -100,26 +60,14 @@ public class TestScrLazyPersistFiles {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
   }
 
-  @After
-  public void shutDownCluster() throws IOException {
-    if (fs != null) {
-      fs.close();
-      fs = null;
-      client = null;
-    }
-
-    if (cluster != null) {
-      cluster.shutdownDataNodes();
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
 
   /**
    * Read in-memory block with Short Circuit Read
    * Note: the test uses faked RAM_DISK from physical disk.
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskShortCircuitRead()
     throws IOException, InterruptedException {
     startUpCluster(REPL_FACTOR,
@@ -160,7 +108,7 @@ public class TestScrLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000000)
+  @Test
   public void testRamDiskEvictionWithShortCircuitReadHandle()
     throws IOException, InterruptedException {
     startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
@@ -204,123 +152,149 @@ public class TestScrLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
-  // ---- Utility functions for all test cases -------------------------------
+  @Test
+  public void testShortCircuitReadAfterEviction()
+      throws IOException, InterruptedException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+    doShortCircuitReadAfterEvictionTest();
+  }
 
-  /**
-   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
-   * capped. If ramDiskStorageLimit < 0 then it is ignored.
-   */
-  private void startUpCluster(final int numDataNodes,
-                              final StorageType[] storageTypes,
-                              final long ramDiskStorageLimit,
-                              final boolean useSCR)
-    throws IOException {
-
-    conf = new Configuration();
-    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
-      LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
-    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
-    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-      HEARTBEAT_RECHECK_INTERVAL_MSEC);
-    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-      LAZY_WRITER_INTERVAL_SEC);
-
-    if (useSCR)
-    {
-      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
-      conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
-        UUID.randomUUID().toString());
-      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-        new File(sockDir.getDir(),
-          "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
-      conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
-    }
+  @Test
+  public void testLegacyShortCircuitReadAfterEviction()
+      throws IOException, InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+    doShortCircuitReadAfterEvictionTest();
+  }
 
-    REPL_FACTOR = 1; //Reset in case a test has modified the value
-
-    cluster = new MiniDFSCluster
-      .Builder(conf)
-      .numDataNodes(numDataNodes)
-      .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
-      .build();
-    fs = cluster.getFileSystem();
-    client = fs.getClient();
-
-    // Artificially cap the storage capacity of the RAM_DISK volume.
-    if (ramDiskStorageLimit >= 0) {
-      List<? extends FsVolumeSpi> volumes =
-        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
-      for (FsVolumeSpi volume : volumes) {
-        if (volume.getStorageType() == RAM_DISK) {
-          ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
-        }
-      }
+  private void doShortCircuitReadAfterEvictionTest() throws IOException,
+      InterruptedException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+
+    // Verify short-circuit read from RAM_DISK.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // Verify short-circuit read from RAM_DISK once again.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+    // Create another file with a replica on RAM_DISK, which evicts the first.
+    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Verify short-circuit read still works from DEFAULT storage.  This time,
+    // we'll have a checksum written during lazy persistence.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+    metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+    // In the implementation of legacy short-circuit reads, any failure is
+    // trapped silently, reverts back to a remote read, and also disables all
+    // subsequent legacy short-circuit reads in the ClientContext.  If the test
+    // uses legacy, then assert that it didn't get disabled.
+    ClientContext clientContext = client.getClientContext();
+    if (clientContext.getUseLegacyBlockReaderLocal()) {
+      Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
     }
+  }
 
-    LOG.info("Cluster startup complete");
+  @Test
+  public void testShortCircuitReadBlockFileCorruption() throws IOException,
+      InterruptedException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+    doShortCircuitReadBlockFileCorruptionTest();
   }
 
-  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
-    throws IOException {
+  @Test
+  public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
+      InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+    doShortCircuitReadBlockFileCorruptionTest();
+  }
 
-    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+  public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
+      InterruptedException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
-    if (isLazyPersist) {
-      createFlags.add(LAZY_PERSIST);
-    }
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    FSDataOutputStream fos = null;
-    try {
-      fos =
-        fs.create(path,
-          FsPermission.getFileDefault(),
-          createFlags,
-          BUFFER_LENGTH,
-          REPL_FACTOR,
-          BLOCK_SIZE,
-          null);
-
-      // Allocate a block.
-      byte[] buffer = new byte[BUFFER_LENGTH];
-      for (int bytesWritten = 0; bytesWritten < length; ) {
-        fos.write(buffer, 0, buffer.length);
-        bytesWritten += buffer.length;
-      }
-      if (length > 0) {
-        fos.hsync();
-      }
-    } finally {
-      IOUtils.closeQuietly(fos);
-    }
+    // Create another file with a replica on RAM_DISK, which evicts the first.
+    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Corrupt the lazy-persisted block file, and verify that checksum
+    // verification catches it.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+    MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
+    exception.expect(ChecksumException.class);
+    DFSTestUtil.readFileBuffer(fs, path1);
   }
 
-  private LocatedBlocks ensureFileReplicasOnStorageType(
-    Path path, StorageType storageType) throws IOException {
-    // Ensure that returned block locations returned are correct!
-    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
-    assertThat(fs.exists(path), is(true));
-    long fileLength = client.getFileInfo(path.toString()).getLen();
-    LocatedBlocks locatedBlocks =
-      client.getLocatedBlocks(path.toString(), 0, fileLength);
-    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
-      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
-    }
-    return locatedBlocks;
+  @Test
+  public void testShortCircuitReadMetaFileCorruption() throws IOException,
+      InterruptedException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+    doShortCircuitReadMetaFileCorruptionTest();
   }
 
-  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
-                                  long seed) throws IOException {
-    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
-      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  @Test
+  public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
+      InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+    doShortCircuitReadMetaFileCorruptionTest();
   }
 
-  private void triggerBlockReport()
-    throws IOException, InterruptedException {
-    // Trigger block report to NN
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
+  public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
+      InterruptedException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Create another file with a replica on RAM_DISK, which evicts the first.
+    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Corrupt the lazy-persisted checksum file, and verify that checksum
+    // verification catches it.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+    File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    MiniDFSCluster.corruptBlock(metaFile);
+    exception.expect(ChecksumException.class);
+    DFSTestUtil.readFileBuffer(fs, path1);
   }
 }


Mime
View raw message