hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [48/50] [abbrv] hadoop git commit: HDFS-8192. Eviction should key off used locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)
Date Sat, 20 Jun 2015 21:25:53 GMT
HDFS-8192. Eviction should key off used locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7d022b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7d022b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7d022b6

Branch: refs/heads/HDFS-7240
Commit: c7d022b66f0c5baafbb7000a435c1d6e39906efe
Parents: 658b5c8
Author: Arpit Agarwal <arp@apache.org>
Authored: Sat Jun 20 13:27:52 2015 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Sat Jun 20 13:27:52 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 -
 .../datanode/fsdataset/impl/FsDatasetCache.java |   7 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  98 +++++-----
 .../hdfs/server/balancer/TestBalancer.java      |   1 -
 .../fsdataset/impl/LazyPersistTestCase.java     |  42 ++--
 .../impl/TestLazyPersistLockedMemory.java       |  25 ++-
 .../impl/TestLazyPersistReplicaPlacement.java   |  36 +++-
 .../datanode/fsdataset/impl/TestLazyWriter.java |  62 +++---
 .../fsdataset/impl/TestScrLazyPersistFiles.java | 193 +++++++------------
 10 files changed, 224 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index aad3c25..2e030b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -658,6 +658,9 @@ Release 2.8.0 - UNRELEASED
     do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P.
     McCabe)
 
+    HDFS-8192. Eviction should key off used locked memory instead of
+    ram disk free space. (Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5ce2863..30540a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -94,10 +94,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
   public static final String  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
   public static final Class<RamDiskReplicaLruTracker>  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
-  public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
-  public static final float   DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
-  public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
-  public static final long    DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
   public static final String  DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
   public static final int     DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 6f524b2..f70d4af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -405,6 +405,13 @@ public class FsDatasetCache {
   }
 
   /**
+   * Round up to the OS page size.
+   */
+  long roundUpPageSize(long count) {
+    return usedBytesCount.rounder.roundUp(count);
+  }
+
+  /**
    * Background worker that mmaps, mlocks, and checksums a block
    */
   private class CachingTask implements Runnable {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8ebd214..a1ff918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1302,14 +1302,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (allowLazyPersist &&
         lazyWriter != null &&
         b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
-        (cacheManager.reserve(b.getNumBytes())) > 0) {
+        reserveLockedMemory(b.getNumBytes())) {
       try {
         // First try to place the block on a transient volume.
         ref = volumes.getNextTransientVolume(b.getNumBytes());
         datanode.getMetrics().incrRamDiskBlocksWrite();
       } catch(DiskOutOfSpaceException de) {
         // Ignore the exception since we just fall back to persistent storage.
-        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
       } finally {
         if (ref == null) {
           cacheManager.release(b.getNumBytes());
@@ -1323,6 +1322,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
     // create an rbw file to hold block in the designated volume
+
+    if (allowLazyPersist && !v.isTransientStorage()) {
+      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+    }
+
     File f;
     try {
       f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
@@ -2833,20 +2837,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   class LazyWriter implements Runnable {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
-    final float lowWatermarkFreeSpacePercentage;
-    final long lowWatermarkFreeSpaceBytes;
-
 
     public LazyWriter(Configuration conf) {
       this.checkpointerInterval = conf.getInt(
           DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
           DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
-      this.lowWatermarkFreeSpacePercentage = conf.getFloat(
-          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
-          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
-      this.lowWatermarkFreeSpaceBytes = conf.getLong(
-          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
-          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT);
     }
 
     /**
@@ -2908,41 +2903,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       return succeeded;
     }
 
-    private boolean transientFreeSpaceBelowThreshold() throws IOException {
-      long free = 0;
-      long capacity = 0;
-      float percentFree = 0.0f;
-
-      // Don't worry about fragmentation for now. We don't expect more than one
-      // transient volume per DN.
-      try (FsVolumeReferences volumes = getFsVolumeReferences()) {
-        for (FsVolumeSpi fvs : volumes) {
-          FsVolumeImpl v = (FsVolumeImpl) fvs;
-          if (v.isTransientStorage()) {
-            capacity += v.getCapacity();
-            free += v.getAvailable();
-          }
-        }
-      }
-
-      if (capacity == 0) {
-        return false;
-      }
-
-      percentFree = (float) ((double)free * 100 / capacity);
-      return (percentFree < lowWatermarkFreeSpacePercentage) ||
-          (free < lowWatermarkFreeSpaceBytes);
-    }
-
     /**
      * Attempt to evict one or more transient block replicas until we
-     * have at least spaceNeeded bytes free.
+     * have at least bytesNeeded bytes free.
      */
-    private void evictBlocks() throws IOException {
+    public void evictBlocks(long bytesNeeded) throws IOException {
       int iterations = 0;
 
+      final long cacheCapacity = cacheManager.getCacheCapacity();
+
       while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
-             transientFreeSpaceBelowThreshold()) {
+             (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
         RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
 
         if (replicaState == null) {
@@ -2959,7 +2930,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         final String bpid = replicaState.getBlockPoolId();
 
         synchronized (FsDatasetImpl.this) {
-          replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
+          replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
+                                       replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
           blockFile = replicaInfo.getBlockFile();
           metaFile = replicaInfo.getMetaFile();
@@ -2968,7 +2940,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
               replicaState.getBlockId(), false);
 
-          // Move the replica from lazyPersist/ to finalized/ on target volume
+          // Move the replica from lazyPersist/ to finalized/ on
+          // the target volume
           BlockPoolSlice bpSlice =
               replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
           File newBlockFile = bpSlice.activateSavedReplica(
@@ -2992,10 +2965,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           if (replicaState.getNumReads() == 0) {
             datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
           }
-        }
 
-        removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
-            blockFileUsed, metaFileUsed, bpid);
+          // Delete the block+meta files from RAM disk and release locked
+          // memory.
+          removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
+              blockFileUsed, metaFileUsed, bpid);
+        }
       }
     }
 
@@ -3006,7 +2981,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       while (fsRunning && shouldRun) {
         try {
           numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
-          evictBlocks();
 
           // Sleep if we have no more work to do or if it looks like we are not
           // making any forward progress. This is to ensure that if all persist
@@ -3094,5 +3068,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       cacheManager.releaseRoundDown(count);
     }
   }
+
+  /**
+   * Attempt to evict blocks from cache Manager to free the requested
+   * bytes.
+   *
+   * @param bytesNeeded
+   */
+  @VisibleForTesting
+  public void evictLazyPersistBlocks(long bytesNeeded) {
+    try {
+      ((LazyWriter) lazyWriter.getRunnable()).evictBlocks(bytesNeeded);
+    } catch(IOException ioe) {
+      LOG.info("Ignoring exception ", ioe);
+    }
+  }
+
+  /**
+   * Attempt to reserve the given amount of memory with the cache Manager.
+   * @param bytesNeeded
+   * @return
+   */
+  boolean reserveLockedMemory(long bytesNeeded) {
+    if (cacheManager.reserve(bytesNeeded) > 0) {
+      return true;
+    }
+
+    // Round up bytes needed to osPageSize and attempt to evict
+    // one more more blocks to free up the reservation.
+    bytesNeeded = cacheManager.roundUpPageSize(bytesNeeded);
+    evictLazyPersistBlocks(bytesNeeded);
+    return cacheManager.reserve(bytesNeeded) > 0;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 1f7bade..e1ce1b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -130,7 +130,6 @@ public class TestBalancer {
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
-    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
     LazyPersistTestCase.initCacheManipulator();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 5ce5cc6..ce29fc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.base.Supplier;
+import org.apache.commons.lang.UnhandledException;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
@@ -37,6 +39,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -79,7 +83,6 @@ public abstract class LazyPersistTestCase {
 
   protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
   protected static final int BUFFER_LENGTH = 4096;
-  protected static final int EVICTION_LOW_WATERMARK = 1;
   private static final long HEARTBEAT_INTERVAL_SEC = 1;
   private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
   private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
@@ -236,7 +239,6 @@ public abstract class LazyPersistTestCase {
       StorageType[] storageTypes,
       int ramDiskReplicaCapacity,
       long ramDiskStorageLimit,
-      long evictionLowWatermarkReplicas,
       long maxLockedMemory,
       boolean useSCR,
       boolean useLegacyBlockReaderLocal,
@@ -256,8 +258,6 @@ public abstract class LazyPersistTestCase {
                 HEARTBEAT_RECHECK_INTERVAL_MSEC);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
                 LAZY_WRITER_INTERVAL_SEC);
-    conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
-                evictionLowWatermarkReplicas * BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
     conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
 
@@ -389,12 +389,6 @@ public abstract class LazyPersistTestCase {
       return this;
     }
 
-    public ClusterWithRamDiskBuilder setEvictionLowWatermarkReplicas(
-        long evictionLowWatermarkReplicas) {
-      this.evictionLowWatermarkReplicas = evictionLowWatermarkReplicas;
-      return this;
-    }
-
     public ClusterWithRamDiskBuilder disableScrubber() {
       this.disableScrubber = true;
       return this;
@@ -403,8 +397,8 @@ public abstract class LazyPersistTestCase {
     public void build() throws IOException {
       LazyPersistTestCase.this.startUpCluster(
           numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
-          ramDiskStorageLimit, evictionLowWatermarkReplicas,
-          maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber);
+          ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
+          disableScrubber);
     }
 
     private int numDatanodes = REPL_FACTOR;
@@ -415,7 +409,6 @@ public abstract class LazyPersistTestCase {
     private boolean hasTransientStorage = true;
     private boolean useScr = false;
     private boolean useLegacyBlockReaderLocal = false;
-    private long evictionLowWatermarkReplicas = EVICTION_LOW_WATERMARK;
     private boolean disableScrubber=false;
   }
 
@@ -513,4 +506,27 @@ public abstract class LazyPersistTestCase {
       e.printStackTrace();
     }
   }
+
+  protected void waitForMetric(final String metricName, final int expectedValue)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          final int currentValue = Integer.parseInt(jmx.getValue(metricName));
+          LOG.info("Waiting for " + metricName +
+                       " to reach value " + expectedValue +
+                       ", current value = " + currentValue);
+          return currentValue == expectedValue;
+        } catch (Exception e) {
+          throw new UnhandledException("Test failed due to unexpected exception", e);
+        }
+      }
+    }, 1000, Integer.MAX_VALUE);
+  }
+
+  protected void triggerEviction(DataNode dn) {
+    FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
+    fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
index 9ea4665..eef8f0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.MetricsAsserts;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -103,25 +101,26 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
    * Verify that locked RAM is released when blocks are evicted from RAM disk.
    */
   @Test
-  public void testReleaseOnEviction()
-      throws IOException, TimeoutException, InterruptedException {
+  public void testReleaseOnEviction() throws Exception {
     getClusterBuilder().setNumDatanodes(1)
                        .setMaxLockedMemory(BLOCK_SIZE)
                        .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
                        .build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
-    final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+    final FsDatasetImpl fsd =
+        (FsDatasetImpl) cluster.getDataNodes().get(0).getFSDataset();
 
-    Path path = new Path("/" + METHOD_NAME + ".dat");
-    makeTestFile(path, BLOCK_SIZE, true);
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    makeTestFile(path1, BLOCK_SIZE, true);
+    assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
 
-    // The block should get evicted soon since it pushes RAM disk free
-    // space below the threshold.
-    waitForLockedBytesUsed(fsd, 0);
+    // Wait until the replica is written to persistent storage.
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
-    MetricsRecordBuilder rb =
-        MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
-    MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
+    // Trigger eviction and verify locked bytes were released.
+    fsd.evictLazyPersistBlocks(Long.MAX_VALUE);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+    waitForLockedBytesUsed(fsd, 0);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index 018eaba..c89475a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -28,6 +29,8 @@ import java.io.IOException;
 
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
 public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
@@ -70,32 +73,50 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
     ensureFileReplicasOnStorageType(path, DEFAULT);
   }
 
+  @Test
+  public void testSynchronousEviction() throws Exception {
+    getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+
+    final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Wait until the replica is written to persistent storage.
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
+
+    // Ensure that writing a new file to RAM DISK evicts the block
+    // for the previous one.
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+    makeTestFile(path2, BLOCK_SIZE, true);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
+  }
+
   /**
    * File can not fit in RamDisk even with eviction
    * @throws IOException
    */
   @Test
   public void testFallbackToDiskFull() throws Exception {
-    getClusterBuilder().setRamDiskReplicaCapacity(0).build();
+    getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE / 2).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
     makeTestFile(path, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path, DEFAULT);
-
     verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
   }
 
   /**
    * File partially fit in RamDisk after eviction.
    * RamDisk can fit 2 blocks. Write a file with 5 blocks.
-   * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
+   * Expect 2 blocks are on RamDisk and rest on disk.
    * @throws IOException
    */
   @Test
   public void testFallbackToDiskPartial()
       throws IOException, InterruptedException {
-    getClusterBuilder().setRamDiskReplicaCapacity(2).build();
+    getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -122,8 +143,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
 
     // Since eviction is asynchronous, depending on the timing of eviction
     // wrt writes, we may get 2 or less blocks on RAM disk.
-    assert(numBlocksOnRamDisk <= 2);
-    assert(numBlocksOnDisk >= 3);
+    assertThat(numBlocksOnRamDisk, is(2));
+    assertThat(numBlocksOnDisk, is(3));
   }
 
   /**
@@ -134,7 +155,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
    */
   @Test
   public void testRamDiskNotChosenByDefault() throws IOException {
-    getClusterBuilder().build();
+    getClusterBuilder().setStorageTypes(new StorageType[] {RAM_DISK, RAM_DISK})
+                       .build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index ee8aaf0..6b16066 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
@@ -38,18 +39,16 @@ import static org.junit.Assert.assertTrue;
 public class TestLazyWriter extends LazyPersistTestCase {
   @Test
   public void testLazyPersistBlocksAreSaved()
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     getClusterBuilder().build();
+    final int NUM_BLOCKS = 10;
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
     // Create a test file
-    makeTestFile(path, BLOCK_SIZE * 10, true);
+    makeTestFile(path, BLOCK_SIZE * NUM_BLOCKS, true);
     LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
-
-    // Sleep for a short time to allow the lazy writer thread to do its job
-    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
+    waitForMetric("RamDiskBlocksLazyPersisted", NUM_BLOCKS);
     LOG.info("Verifying copy was saved to lazyPersist/");
 
     // Make sure that there is a saved copy of the replica on persistent
@@ -57,35 +56,22 @@ public class TestLazyWriter extends LazyPersistTestCase {
     ensureLazyPersistBlocksAreSaved(locatedBlocks);
   }
 
-  /**
-   * RamDisk eviction after lazy persist to disk.
-   * @throws Exception
-   */
   @Test
-  public void testRamDiskEviction() throws Exception {
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK).build();
+  public void testSynchronousEviction() throws Exception {
+    getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
-    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
-    final int SEED = 0xFADED;
-    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    // Sleep for a short time to allow the lazy writer thread to do its job.
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    // Wait until the replica is written to persistent storage.
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
-    // Create another file with a replica on RAM_DISK.
+    // Ensure that writing a new file to RAM DISK evicts the block
+    // for the previous one.
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
     makeTestFile(path2, BLOCK_SIZE, true);
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-    triggerBlockReport();
-
-    // Ensure the first file was evicted to disk, the second is still on
-    // RAM_DISK.
-    ensureFileReplicasOnStorageType(path2, RAM_DISK);
-    ensureFileReplicasOnStorageType(path1, DEFAULT);
-
     verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
     verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
   }
@@ -98,8 +84,8 @@ public class TestLazyWriter extends LazyPersistTestCase {
    */
   @Test
   public void testRamDiskEvictionBeforePersist()
-      throws IOException, InterruptedException {
-    getClusterBuilder().setRamDiskReplicaCapacity(1).build();
+      throws Exception {
+    getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
     Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -116,6 +102,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
 
     // Eviction should not happen for block of the first file that is not
     // persisted yet.
+    verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 0);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
     ensureFileReplicasOnStorageType(path2, DEFAULT);
 
@@ -133,7 +120,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
   public void testRamDiskEvictionIsLru()
       throws Exception {
     final int NUM_PATHS = 5;
-    getClusterBuilder().setRamDiskReplicaCapacity(NUM_PATHS + EVICTION_LOW_WATERMARK).build();
+    getClusterBuilder().setMaxLockedMemory(NUM_PATHS * BLOCK_SIZE).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path paths[] = new Path[NUM_PATHS * 2];
 
@@ -145,8 +132,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
       makeTestFile(paths[i], BLOCK_SIZE, true);
     }
 
-    // Sleep for a short time to allow the lazy writer thread to do its job.
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    waitForMetric("RamDiskBlocksLazyPersisted", NUM_PATHS);
 
     for (int i = 0; i < NUM_PATHS; ++i) {
       ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
@@ -227,16 +213,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
 
     makeTestFile(path, BLOCK_SIZE, true);
     LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
-
-    // Sleep for a short time to allow the lazy writer thread to do its job
-    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
     // Delete after persist
     client.delete(path.toString(), false);
     Assert.assertFalse(fs.exists(path));
 
     assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
-
     verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
     verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
   }
@@ -248,7 +231,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
    */
   @Test
   public void testDfsUsageCreateDelete()
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     getClusterBuilder().setRamDiskReplicaCapacity(4).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -261,8 +244,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
 
     assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
 
-    // Sleep for a short time to allow the lazy writer thread to do its job
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
     long usedAfterPersist = fs.getUsed();
     assertThat(usedAfterPersist, is((long) BLOCK_SIZE));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
index 7c7ba64..2512588 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.NativeCodeLoader;
@@ -39,13 +41,20 @@ import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Test Lazy persist behavior with short-circuit reads. These tests
+ * will be run on Linux only with Native IO enabled. The tests fake
+ * RAM_DISK storage using local disk.
+ */
 public class TestScrLazyPersistFiles extends LazyPersistTestCase {
 
   @BeforeClass
@@ -58,6 +67,10 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
     Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
         equalTo(true));
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+
+    final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
+    Preconditions.checkState(BLOCK_SIZE >= osPageSize);
+    Preconditions.checkState(BLOCK_SIZE % osPageSize == 0);
   }
 
   @Rule
@@ -69,35 +82,27 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
    */
   @Test
   public void testRamDiskShortCircuitRead()
-    throws IOException, InterruptedException {
-    getClusterBuilder().setNumDatanodes(REPL_FACTOR)
-                       .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
-                       .setRamDiskStorageLimit(2 * BLOCK_SIZE - 1)
-                       .setUseScr(true)
-                       .build();
+      throws IOException, InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final int SEED = 0xFADED;
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
+    // Create a file and wait till it is persisted.
     makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
     ensureFileReplicasOnStorageType(path, RAM_DISK);
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
-    // Sleep for a short time to allow the lazy writer thread to do its job
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
-    //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
-    FSDataInputStream fis = fs.open(path);
+    HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path);
 
     // Verify SCR read counters
     try {
-      fis = fs.open(path);
       byte[] buf = new byte[BUFFER_LENGTH];
       fis.read(0, buf, 0, BUFFER_LENGTH);
-      HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
       Assert.assertEquals(BUFFER_LENGTH,
-        dfsis.getReadStatistics().getTotalBytesRead());
+        fis.getReadStatistics().getTotalBytesRead());
       Assert.assertEquals(BUFFER_LENGTH,
-        dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+        fis.getReadStatistics().getTotalShortCircuitBytesRead());
     } finally {
       fis.close();
       fis = null;
@@ -111,106 +116,77 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
    * @throws InterruptedException
    */
   @Test
-  public void testRamDiskEvictionWithShortCircuitReadHandle()
-    throws IOException, InterruptedException {
-    // 5 replica + delta, SCR.
-    getClusterBuilder().setNumDatanodes(REPL_FACTOR)
-                       .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
-                       .setRamDiskStorageLimit(6 * BLOCK_SIZE - 1)
-                       .setEvictionLowWatermarkReplicas(3)
-                       .setUseScr(true)
-                       .build();
-
+  public void tesScrDuringEviction()
+      throws Exception {
+    getClusterBuilder().setUseScr(true).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
-    final int SEED = 0xFADED;
 
-    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    // Create a file and wait till it is persisted.
+    makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
-    // Sleep for a short time to allow the lazy writer thread to do its job.
-    // However the block replica should not be evicted from RAM_DISK yet.
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
-    // No eviction should happen as the free ratio is below the threshold
-    FSDataInputStream fis = fs.open(path1);
+    HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path1);
     try {
       // Keep and open read handle to path1 while creating path2
       byte[] buf = new byte[BUFFER_LENGTH];
       fis.read(0, buf, 0, BUFFER_LENGTH);
-
-      // Create the 2nd file that will trigger RAM_DISK eviction.
-      makeTestFile(path2, BLOCK_SIZE * 2, true);
-      ensureFileReplicasOnStorageType(path2, RAM_DISK);
+      triggerEviction(cluster.getDataNodes().get(0));
 
       // Ensure path1 is still readable from the open SCR handle.
-      fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH);
-      HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
-      Assert.assertEquals(2 * BUFFER_LENGTH,
-        dfsis.getReadStatistics().getTotalBytesRead());
-      Assert.assertEquals(2 * BUFFER_LENGTH,
-        dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+      fis.read(0, buf, 0, BUFFER_LENGTH);
+      assertThat(fis.getReadStatistics().getTotalBytesRead(),
+          is((long) 2 * BUFFER_LENGTH));
+      assertThat(fis.getReadStatistics().getTotalShortCircuitBytesRead(),
+          is((long) 2 * BUFFER_LENGTH));
     } finally {
       IOUtils.closeQuietly(fis);
     }
-
-    // After the open handle is closed, path1 should be evicted to DISK.
-    triggerBlockReport();
-    ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
   @Test
-  public void testShortCircuitReadAfterEviction()
-      throws IOException, InterruptedException {
-    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
-                       .setUseScr(true)
+  public void testScrAfterEviction()
+      throws IOException, InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true)
                        .setUseLegacyBlockReaderLocal(false)
                        .build();
     doShortCircuitReadAfterEvictionTest();
   }
 
   @Test
-  public void testLegacyShortCircuitReadAfterEviction()
-      throws IOException, InterruptedException {
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
-                       .setUseScr(true)
+  public void testLegacyScrAfterEviction()
+      throws IOException, InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true)
                        .setUseLegacyBlockReaderLocal(true)
                        .build();
     doShortCircuitReadAfterEvictionTest();
+
+    // In the implementation of legacy short-circuit reads, any failure is
+    // trapped silently, reverts back to a remote read, and also disables all
+    // subsequent legacy short-circuit reads in the ClientContext.
+    // Assert that it didn't get disabled.
+    ClientContext clientContext = client.getClientContext();
+    Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
   }
 
   private void doShortCircuitReadAfterEvictionTest() throws IOException,
-      InterruptedException {
+      InterruptedException, TimeoutException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
     final int SEED = 0xFADED;
     makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
 
     // Verify short-circuit read from RAM_DISK.
-    ensureFileReplicasOnStorageType(path1, RAM_DISK);
     File metaFile = cluster.getBlockMetadataFile(0,
         DFSTestUtil.getFirstBlock(fs, path1));
     assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
     assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
 
-    // Sleep for a short time to allow the lazy writer thread to do its job.
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
-    // Verify short-circuit read from RAM_DISK once again.
-    ensureFileReplicasOnStorageType(path1, RAM_DISK);
-    metaFile = cluster.getBlockMetadataFile(0,
-        DFSTestUtil.getFirstBlock(fs, path1));
-    assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
-    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
-
-    // Create another file with a replica on RAM_DISK, which evicts the first.
-    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-    triggerBlockReport();
+    triggerEviction(cluster.getDataNodes().get(0));
 
     // Verify short-circuit read still works from DEFAULT storage.  This time,
     // we'll have a checksum written during lazy persistence.
@@ -219,54 +195,35 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
         DFSTestUtil.getFirstBlock(fs, path1));
     assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
     assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
-
-    // In the implementation of legacy short-circuit reads, any failure is
-    // trapped silently, reverts back to a remote read, and also disables all
-    // subsequent legacy short-circuit reads in the ClientContext.  If the test
-    // uses legacy, then assert that it didn't get disabled.
-    ClientContext clientContext = client.getClientContext();
-    if (clientContext.getUseLegacyBlockReaderLocal()) {
-      Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
-    }
   }
 
   @Test
-  public void testShortCircuitReadBlockFileCorruption() throws IOException,
-      InterruptedException {
-    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
-                       .setUseScr(true)
+  public void testScrBlockFileCorruption() throws IOException,
+      InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true)
                        .setUseLegacyBlockReaderLocal(false)
                        .build();
     doShortCircuitReadBlockFileCorruptionTest();
   }
 
   @Test
-  public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
-      InterruptedException {
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
-                       .setUseScr(true)
+  public void testLegacyScrBlockFileCorruption() throws IOException,
+      InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true)
                        .setUseLegacyBlockReaderLocal(true)
                        .build();
     doShortCircuitReadBlockFileCorruptionTest();
   }
 
   public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
-      InterruptedException {
+      InterruptedException, TimeoutException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
-    final int SEED = 0xFADED;
-    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
-
-    // Create another file with a replica on RAM_DISK, which evicts the first.
-    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-
-    // Sleep for a short time to allow the lazy writer thread to do its job.
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-    triggerBlockReport();
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
+    triggerEviction(cluster.getDataNodes().get(0));
 
     // Corrupt the lazy-persisted block file, and verify that checksum
     // verification catches it.
@@ -277,42 +234,32 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
   }
 
   @Test
-  public void testShortCircuitReadMetaFileCorruption() throws IOException,
-      InterruptedException {
-    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
-                       .setUseScr(true)
+  public void testScrMetaFileCorruption() throws IOException,
+      InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true)
                        .setUseLegacyBlockReaderLocal(false)
                        .build();
     doShortCircuitReadMetaFileCorruptionTest();
   }
 
   @Test
-  public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
-      InterruptedException {
-    getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
-                       .setUseScr(true)
+  public void testLegacyScrMetaFileCorruption() throws IOException,
+      InterruptedException, TimeoutException {
+    getClusterBuilder().setUseScr(true)
                        .setUseLegacyBlockReaderLocal(true)
                        .build();
     doShortCircuitReadMetaFileCorruptionTest();
   }
 
   public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
-      InterruptedException {
+      InterruptedException, TimeoutException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
-    final int SEED = 0xFADED;
-    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
-
-    // Create another file with a replica on RAM_DISK, which evicts the first.
-    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-
-    // Sleep for a short time to allow the lazy writer thread to do its job.
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-    triggerBlockReport();
+    waitForMetric("RamDiskBlocksLazyPersisted", 1);
+    triggerEviction(cluster.getDataNodes().get(0));
 
     // Corrupt the lazy-persisted checksum file, and verify that checksum
     // verification catches it.


Mime
View raw message