hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [10/34] git commit: HDFS-6950. Add Additional unit tests for HDFS-6581. (Contributed by Xiaoyu Yao)
Date Fri, 17 Oct 2014 23:30:57 GMT
HDFS-6950. Add Additional unit tests for HDFS-6581. (Contributed by Xiaoyu Yao)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4609ba0d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4609ba0d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4609ba0d

Branch: refs/heads/branch-2.6
Commit: 4609ba0dc899f3c488bff3ec00d48f6938facd6b
Parents: 754ba3a
Author: arp <arp@apache.org>
Authored: Wed Sep 3 10:51:26 2014 -0700
Committer: Jitendra Pandey <Jitendra@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  45 +-
 .../fsdataset/impl/TestLazyPersistFiles.java    | 543 +++++++++++++++++--
 2 files changed, 537 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4609ba0d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 8c987c4..820b812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -94,6 +94,11 @@ import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -305,16 +310,29 @@ public class DFSTestUtil {
   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
                                 long fileLen, long blockSize, short replFactor, long seed)
       throws IOException {
-    assert bufferLen > 0;
-    if (!fs.mkdirs(fileName.getParent())) {
+    createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
+            replFactor, seed, false);
+  }
+
+  public static void createFile(FileSystem fs, Path fileName,
+      boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+      short replFactor, long seed, boolean flush) throws IOException {
+  assert bufferLen > 0;
+  if (!fs.mkdirs(fileName.getParent())) {
       throw new IOException("Mkdirs failed to create " +
-                            fileName.getParent().toString());
-    }
-    FSDataOutputStream out = null;
-    try {
-      out = fs.create(fileName, true, fs.getConf()
-                                        .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
4096),
-                      replFactor, blockSize);
+                fileName.getParent().toString());
+  }
+  FSDataOutputStream out = null;
+  EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+  createFlags.add(OVERWRITE);
+  if (isLazyPersist) {
+    createFlags.add(LAZY_PERSIST);
+  }
+  try {
+      out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
+        fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        replFactor, blockSize, null);
+
       if (fileLen > 0) {
         byte[] toWrite = new byte[bufferLen];
         Random rb = new Random(seed);
@@ -322,10 +340,13 @@ public class DFSTestUtil {
         while (bytesToWrite>0) {
           rb.nextBytes(toWrite);
           int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
-              : (int) bytesToWrite;
+            : (int) bytesToWrite;
 
-          out.write(toWrite, 0, bytesToWriteNext);
-          bytesToWrite -= bytesToWriteNext;
+            out.write(toWrite, 0, bytesToWriteNext);
+            bytesToWrite -= bytesToWriteNext;
+        }
+        if (flush) {
+          out.hsync();
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4609ba0d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index cac99a7..461c44d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -16,46 +16,49 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
-import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Test;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 public class TestLazyPersistFiles {
   public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
@@ -66,8 +69,10 @@ public class TestLazyPersistFiles {
     ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
   }
 
+  private static final int THREADPOOL_SIZE = 10;
+
   private static short REPL_FACTOR = 1;
-  private static final long BLOCK_SIZE = 10485760;   // 10 MB
+  private static final int BLOCK_SIZE = 10485760;   // 10 MB
   private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
   private static final long HEARTBEAT_INTERVAL_SEC = 1;
   private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@@ -162,6 +167,26 @@ public class TestLazyPersistFiles {
   }
 
   @Test (timeout=300000)
+  public void testPlacementOnSizeLimitedRamDisk() throws IOException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
+      3 * BLOCK_SIZE -1); // 2 replicas + delta
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    makeTestFile(path1, BLOCK_SIZE, true);
+    makeTestFile(path2, BLOCK_SIZE, true);
+
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    ensureFileReplicasOnStorageType(path2, RAM_DISK);
+  }
+
+  /**
+   * Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
+   * Write should default to disk. No error.
+   * @throws IOException
+   */
+  @Test (timeout=300000)
   public void testFallbackToDisk() throws IOException {
     startUpCluster(REPL_FACTOR, null, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -172,6 +197,59 @@ public class TestLazyPersistFiles {
   }
 
   /**
+   * File can not fit in RamDisk even with eviction
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testFallbackToDiskFull() throws IOException {
+    startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, DEFAULT);
+  }
+
+  /**
+   * File partially fit in RamDisk after eviction.
+   * RamDisk can fit 2 blocks. Write a file with 5 blocks.
+   * Expect 2 blocks are on RamDisk whereas other 3 on disk.
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testFallbackToDiskPartial()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR,
+        new StorageType[] { RAM_DISK, DEFAULT },
+        BLOCK_SIZE * 3 - 1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE * 5, true);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    triggerBlockReport();
+
+    int numBlocksOnRamDisk = 0;
+    int numBlocksOnDisk = 0;
+
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+      client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
+        numBlocksOnRamDisk++;
+      }else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
+        numBlocksOnDisk++;
+      }
+    }
+    assertThat(numBlocksOnRamDisk, is(2));
+    assertThat(numBlocksOnDisk, is(3));
+  }
+
+  /**
    * If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
    * specified, then block placement should fail.
    *
@@ -191,6 +269,10 @@ public class TestLazyPersistFiles {
     }
   }
 
+  /**
+   * Append to lazy persist file is denied.
+   * @throws IOException
+   */
   @Test (timeout=300000)
   public void testAppendIsDenied() throws IOException {
     startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
@@ -216,7 +298,7 @@ public class TestLazyPersistFiles {
   public void testLazyPersistFilesAreDiscarded()
       throws IOException, InterruptedException {
     startUpCluster(REPL_FACTOR,
-                   new StorageType[] {RAM_DISK, DEFAULT },
+                   new StorageType[] { RAM_DISK, DEFAULT },
                    (2 * BLOCK_SIZE - 1));   // 1 replica + delta.
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -256,7 +338,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testLazyPersistBlocksAreSaved()
       throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -302,8 +384,13 @@ public class TestLazyPersistFiles {
     assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
   }
 
-
-  @Test (timeout=300000)
+  /**
+   * RamDisk eviction after lazy persist to disk.
+   * Evicted blocks are still readable with on-disk replicas.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+ @Test (timeout=300000)
   public void testRamDiskEviction()
       throws IOException, InterruptedException {
     startUpCluster(REPL_FACTOR,
@@ -313,7 +400,8 @@ public class TestLazyPersistFiles {
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
     Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
-    makeTestFile(path1, BLOCK_SIZE, true);
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
     // Sleep for a short time to allow the lazy writer thread to do its job.
@@ -323,15 +411,268 @@ public class TestLazyPersistFiles {
 
     // Create another file with a replica on RAM_DISK.
     makeTestFile(path2, BLOCK_SIZE, true);
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
+    triggerBlockReport();
 
     // Make sure that the second file's block replica is on RAM_DISK, whereas
     // the original file's block replica is now on disk.
-    ensureFileReplicasOnStorageType(path2, RAM_DISK);
+//    ensureFileReplicasOnStorageType(path2, RAM_DISK);
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
+  /**
+   * RamDisk eviction should not happen on blocks that are not yet
+   * persisted on disk.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testRamDiskEvictionBeforePersist()
+    throws IOException, InterruptedException {
+    // 1 replica + delta, lazy persist interval every 50 minutes
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+      (2 * BLOCK_SIZE - 1));
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+    final int SEED = 0XFADED;
+
+    // Stop lazy writer to ensure block for path1 is not persisted to disk.
+    stopLazyWriter(cluster.getDataNodes().get(0));
+
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Create second file with a replica on RAM_DISK.
+    makeTestFile(path2, BLOCK_SIZE, true);
+
+    // Eviction should not happen for block of the first file that is not
+    // persisted yet.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    ensureFileReplicasOnStorageType(path2, DEFAULT);
+
+    assert(fs.exists(path1));
+    assert(fs.exists(path2));
+    verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+  }
+
+  /**
+   * Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testRamDiskEvictionLRU()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+      (4 * BLOCK_SIZE -1));  // 3 replica + delta.
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final int NUM_PATHS = 6;
+    Path paths[] = new Path[NUM_PATHS];
+
+    for (int i = 0; i < NUM_PATHS; i++) {
+      paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
+    }
+
+    // No eviction for the first half of files
+    for (int i = 0; i < NUM_PATHS/2; i++) {
+      makeTestFile(paths[i], BLOCK_SIZE, true);
+      ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+    }
+
+    // Lazy persist writer persists the first half of files
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // Create the second half of files with eviction upon each create.
+    for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
+      makeTestFile(paths[i], BLOCK_SIZE, true);
+      ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+
+      // path[i-NUM_PATHS/2] is expected to be evicted by LRU
+      triggerBlockReport();
+      ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
+    }
+  }
+
+  /**
+   * Delete lazy-persist file that has not been persisted to disk.
+   * Memory is freed up and file is gone.
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testDeleteBeforePersist()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+      -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    stopLazyWriter(cluster.getDataNodes().get(0));
+
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, BLOCK_SIZE, true);
+    LocatedBlocks locatedBlocks =
+      ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Delete before persist
+    client.delete(path.toString(), false);
+    Assert.assertFalse(fs.exists(path));
+
+    assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+  }
+
+  /**
+   * Delete lazy-persist file that has been persisted to disk
+   * Both memory blocks and disk blocks are deleted.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testDeleteAfterPersist()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // Delete after persist
+    client.delete(path.toString(), false);
+    Assert.assertFalse(fs.exists(path));
+
+    triggerBlockReport();
+
+    assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+  }
+
+  /**
+   * RAM_DISK used/free space
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testDfsUsageCreateDelete()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+      5 * BLOCK_SIZE - 1);  // 4 replica + delta
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    // Get the usage before write BLOCK_SIZE
+    long usedBeforeCreate = fs.getUsed();
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    long usedAfterCreate = fs.getUsed();
+
+    assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    long usedAfterPersist = fs.getUsed();
+    assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
+
+    // Delete after persist
+    client.delete(path.toString(), false);
+    long usedAfterDelete = fs.getUsed();
+
+    assertThat(usedBeforeCreate, is(usedAfterDelete));
+  }
+
+  /**
+   * Concurrent read from the same node and verify the contents.
+   */
+  @Test (timeout=300000)
+  public void testConcurrentRead()
+    throws Exception {
+    startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
+      3 * BLOCK_SIZE -1); // 2 replicas + delta
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final Path path1 = new Path("/" + METHOD_NAME + ".dat");
+
+    final int SEED = 0xFADED;
+    final int NUM_TASKS = 5;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    //Read from multiple clients
+    final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+        } catch (Throwable e) {
+          LOG.error("readerRunnable error", e);
+          testFailed.set(true);
+        } finally {
+          latch.countDown();
+        }
+      }
+    };
+
+    Thread threads[] = new Thread[NUM_TASKS];
+    for (int i = 0; i < NUM_TASKS; i++) {
+      threads[i] = new Thread(readerRunnable);
+      threads[i].start();
+    }
+
+    Thread.sleep(500);
+
+    for (int i = 0; i < NUM_TASKS; i++) {
+      Uninterruptibles.joinUninterruptibly(threads[i]);
+    }
+    Assert.assertFalse(testFailed.get());
+  }
+
+  /**
+   * Concurrent write with eviction
+   * RAM_DISK can hold 9 replicas
+   * 4 threads each write 5 replicas
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testConcurrentWrites()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+      (10 * BLOCK_SIZE -1)); // 9 replica + delta.
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final int SEED = 0xFADED;
+    final int NUM_WRITERS = 4;
+    final int NUM_WRITER_PATHS = 5;
+
+    Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
+    for (int i = 0; i < NUM_WRITERS; i++) {
+      paths[i] = new Path[NUM_WRITER_PATHS];
+      for (int j = 0; j < NUM_WRITER_PATHS; j++) {
+        paths[i][j] =
+          new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
+      }
+    }
+
+    final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+    ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
+    for (int i = 0; i < NUM_WRITERS; i++) {
+      Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
+                                           testFailed);
+      executor.execute(writer);
+    }
+
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Stop executor from adding new tasks to finish existing threads in queue
+    latch.await();
+
+    assertThat(testFailed.get(), is(false));
+  }
+
   @Test (timeout=300000)
   public void testDnRestartWithSavedReplicas()
       throws IOException, InterruptedException {
@@ -384,11 +725,12 @@ public class TestLazyPersistFiles {
 
   /**
    * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
-   * capped. If tmpfsStorageLimit < 0 then it is ignored.
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
    */
   private void startUpCluster(final int numDataNodes,
                               final StorageType[] storageTypes,
-                              final long ramDiskStorageLimit)
+                              final long ramDiskStorageLimit,
+                              final boolean useSCR)
       throws IOException {
 
     conf = new Configuration();
@@ -397,11 +739,13 @@ public class TestLazyPersistFiles {
                 LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-                HEARTBEAT_RECHECK_INTERVAL_MSEC);
+      HEARTBEAT_RECHECK_INTERVAL_MSEC);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-                LAZY_WRITER_INTERVAL_SEC);
+      LAZY_WRITER_INTERVAL_SEC);
+
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
 
-    REPL_FACTOR = 1; //Reset if case a test has modified the value
+    REPL_FACTOR = 1; //Reset in case a test has modified the value
 
     cluster = new MiniDFSCluster
         .Builder(conf)
@@ -411,7 +755,7 @@ public class TestLazyPersistFiles {
     fs = cluster.getFileSystem();
     client = fs.getClient();
 
-    // Artifically cap the storage capacity of the tmpfs volume.
+    // Artificially cap the storage capacity of the RAM_DISK volume.
     if (ramDiskStorageLimit >= 0) {
       List<? extends FsVolumeSpi> volumes =
           cluster.getDataNodes().get(0).getFSDataset().getVolumes();
@@ -426,6 +770,13 @@ public class TestLazyPersistFiles {
     LOG.info("Cluster startup complete");
   }
 
+  private void startUpCluster(final int numDataNodes,
+                              final StorageType[] storageTypes,
+                              final long ramDiskStorageLimit)
+    throws IOException {
+    startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
+  }
+
   private void makeTestFile(Path path, long length, final boolean isLazyPersist)
       throws IOException {
 
@@ -435,9 +786,7 @@ public class TestLazyPersistFiles {
       createFlags.add(LAZY_PERSIST);
     }
 
-
     FSDataOutputStream fos = null;
-
     try {
       fos =
           fs.create(path,
@@ -465,13 +814,14 @@ public class TestLazyPersistFiles {
   private LocatedBlocks ensureFileReplicasOnStorageType(
       Path path, StorageType storageType) throws IOException {
     // Ensure that returned block locations returned are correct!
+    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+    assertThat(fs.exists(path), is(true));
     long fileLength = client.getFileInfo(path.toString()).getLen();
     LocatedBlocks locatedBlocks =
         client.getLocatedBlocks(path.toString(), 0, fileLength);
     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
       assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
     }
-
     return locatedBlocks;
   }
 
@@ -480,4 +830,119 @@ public class TestLazyPersistFiles {
     FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
     ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
   }
+
+  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
+                                  long seed) throws IOException {
+    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  }
+
+  private boolean verifyReadRandomFile(
+    Path path, int fileLength, int seed) throws IOException {
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+    byte expected[] = DFSTestUtil.
+      calculateFileContentsFromSeed(seed, fileLength);
+    return Arrays.equals(contents, expected);
+  }
+
+  private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+    throws IOException, InterruptedException {
+
+    LOG.info("Verifying replica has no saved copy after deletion.");
+    triggerBlockReport();
+
+    while(
+      DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+        > 0L){
+      Thread.sleep(1000);
+    }
+
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<? extends FsVolumeSpi> volumes =
+      cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+    // Make sure deleted replica does not have a copy on either finalized dir of
+    // transient volume or finalized dir of non-transient volume
+    for (FsVolumeSpi v : volumes) {
+      FsVolumeImpl volume = (FsVolumeImpl) v;
+      File targetDir = (v.isTransientStorage()) ?
+          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+          volume.getBlockPoolSlice(bpid).getLazypersistDir();
+      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
+
+    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+      File targetDir =
+        DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+      File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+      if (blockFile.exists()) {
+        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+      File metaFile = new File(targetDir,
+        DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+          lb.getBlock().getGenerationStamp()));
+      if (metaFile.exists()) {
+        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void triggerBlockReport()
+    throws IOException, InterruptedException {
+    // Trigger block report to NN
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    Thread.sleep(10 * 1000);
+  }
+
+  class WriterRunnable implements Runnable {
+    private final int id;
+    private final MiniDFSCluster cluster;
+    private final Path paths[];
+    private final int seed;
+    private CountDownLatch latch;
+    private AtomicBoolean bFail;
+
+    public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths,
+                          int seed, CountDownLatch latch,
+                          AtomicBoolean bFail) {
+      id = threadIndex;
+      this.cluster = cluster;
+      this.paths = paths;
+      this.seed = seed;
+      this.latch = latch;
+      this.bFail = bFail;
+      System.out.println("Creating Writer: " + id);
+    }
+
+    public void run() {
+      System.out.println("Writer " + id + " starting... ");
+      int i = 0;
+      try {
+        for (i = 0; i < paths.length; i++) {
+          makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
+          // eviction may faiL when all blocks are not persisted yet.
+          // ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+        }
+      } catch (IOException e) {
+        bFail.set(true);
+        LOG.error("Writer exception: writer id:" + id +
+          " testfile: " + paths[i].toString() +
+          " " + e);
+      } finally {
+        latch.countDown();
+      }
+    }
+  }
 }


Mime
View raw message