hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [2/2] hadoop git commit: HDFS-8157. Writes to RAM DISK reserve locked memory for block files. (Arpit Agarwal)
Date Sat, 16 May 2015 16:18:58 GMT
HDFS-8157. Writes to RAM DISK reserve locked memory for block files. (Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/151f6d6d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/151f6d6d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/151f6d6d

Branch: refs/heads/branch-2
Commit: 151f6d6dcc527800fa7bb4fcc7a6721b7d5747e2
Parents: 802676e
Author: Arpit Agarwal <arp@apache.org>
Authored: Sat May 16 09:05:35 2015 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Sat May 16 09:05:50 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/datanode/ReplicaInPipeline.java |  11 +-
 .../hdfs/server/datanode/ReplicaInfo.java       |  12 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  |   8 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   2 +-
 .../impl/FsDatasetAsyncDiskService.java         |   7 +-
 .../datanode/fsdataset/impl/FsDatasetCache.java |  85 +++++++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 106 ++++++----
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  20 +-
 .../impl/RamDiskReplicaLruTracker.java          |  19 +-
 .../fsdataset/impl/RamDiskReplicaTracker.java   |  12 +-
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |   2 +-
 .../hdfs/server/balancer/TestBalancer.java      |   9 +-
 .../server/datanode/SimulatedFSDataset.java     |   4 +
 .../server/datanode/TestDirectoryScanner.java   |   9 +
 .../server/datanode/TestFsDatasetCache.java     |   4 +-
 .../datanode/extdataset/ExternalVolumeImpl.java |   4 +
 .../fsdataset/impl/LazyPersistTestCase.java     |  57 ++++--
 .../impl/TestLazyPersistLockedMemory.java       | 201 +++++++++++++++++++
 .../fsdataset/impl/TestWriteToReplica.java      |   4 +-
 20 files changed, 497 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3e3bab3..5df6200 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -227,6 +227,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8394. Move getAdditionalBlock() and related functionalities into a
     separate class. (wheat9)
 
+    HDFS-8157. Writes to RAM DISK reserve locked memory for block files.
+    (Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index cc55f85..0eb143a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -51,7 +51,8 @@ public class ReplicaInPipeline extends ReplicaInfo
    * the bytes already written to this block.
    */
   private long bytesReserved;
-  
+  private final long originalBytesReserved;
+
   /**
    * Constructor for a zero length replica
    * @param blockId block id
@@ -97,6 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     this.bytesOnDisk = len;
     this.writer = writer;
     this.bytesReserved = bytesToReserve;
+    this.originalBytesReserved = bytesToReserve;
   }
 
   /**
@@ -109,6 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     this.bytesOnDisk = from.getBytesOnDisk();
     this.writer = from.writer;
     this.bytesReserved = from.bytesReserved;
+    this.originalBytesReserved = from.originalBytesReserved;
   }
 
   @Override
@@ -149,8 +152,14 @@ public class ReplicaInPipeline extends ReplicaInfo
   }
   
   @Override
+  public long getOriginalBytesReserved() {
+    return originalBytesReserved;
+  }
+
+  @Override
   public void releaseAllBytesReserved() {  // ReplicaInPipelineInterface
     getVolume().releaseReservedSpace(bytesReserved);
+    getVolume().releaseLockedMemory(bytesReserved);
     bytesReserved = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 940d3eb..136d8a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -218,7 +218,17 @@ abstract public class ReplicaInfo extends Block implements Replica {
   public long getBytesReserved() {
     return 0;
   }
-  
+
+  /**
+   * Number of bytes originally reserved for this replica. The actual
+   * reservation is adjusted as data is written to disk.
+   *
+   * @return the number of bytes originally reserved for this replica.
+   */
+  public long getOriginalBytesReserved() {
+    return 0;
+  }
+
    /**
    * Copy specified file into a temporary file. Then rename the
    * temporary file to the original name. This will cause any

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index e6ace44..d34022d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -73,6 +73,14 @@ public interface FsVolumeSpi {
   public boolean isTransientStorage();
 
   /**
+   * Release reserved memory for an RBW block written to transient storage
+   * i.e. RAM.
+   * bytesToRelease will be rounded down to the OS page size since locked
+   * memory reservation must always be a multiple of the page size.
+   */
+  public void releaseLockedMemory(long bytesToRelease);
+
+  /**
    * BlockIterator will return ExtendedBlock entries from a block pool in
    * this volume.  The entries will be returned in sorted order.<p/>
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 5bc9c68..b3546d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -481,7 +481,7 @@ class BlockPoolSlice {
     // eventually.
     if (newReplica.getVolume().isTransientStorage()) {
       lazyWriteReplicaMap.addReplica(bpid, blockId,
-          (FsVolumeImpl) newReplica.getVolume());
+          (FsVolumeImpl) newReplica.getVolume(), 0);
     } else {
       lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index c1d3990..fdc9f83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.FileDescriptor;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -277,7 +276,8 @@ class FsDatasetAsyncDiskService {
 
     @Override
     public void run() {
-      long dfsBytes = blockFile.length() + metaFile.length();
+      final long blockLength = blockFile.length();
+      final long metaLength = metaFile.length();
       boolean result;
 
       result = (trashDirectory == null) ? deleteFiles() : moveFiles();
@@ -291,7 +291,8 @@ class FsDatasetAsyncDiskService {
         if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
           datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
         }
-        volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
+        volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
+        volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
         LOG.info("Deleted " + block.getBlockPoolId() + " "
             + block.getLocalBlock() + " file " + blockFile);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index e0df0f2..6f524b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -151,10 +151,15 @@ public class FsDatasetCache {
     /**
      * Round up a number to the operating system page size.
      */
-    public long round(long count) {
-      long newCount = 
-          (count + (osPageSize - 1)) / osPageSize;
-      return newCount * osPageSize;
+    public long roundUp(long count) {
+      return (count + osPageSize - 1) & (~(osPageSize - 1));
+    }
+
+    /**
+     * Round down a number to the operating system page size.
+     */
+    public long roundDown(long count) {
+      return count & (~(osPageSize - 1));
     }
   }
 
@@ -173,7 +178,7 @@ public class FsDatasetCache {
      *                 -1 if we failed.
      */
     long reserve(long count) {
-      count = rounder.round(count);
+      count = rounder.roundUp(count);
       while (true) {
         long cur = usedBytes.get();
         long next = cur + count;
@@ -195,10 +200,23 @@ public class FsDatasetCache {
      * @return         The new number of usedBytes.
      */
     long release(long count) {
-      count = rounder.round(count);
+      count = rounder.roundUp(count);
       return usedBytes.addAndGet(-count);
     }
-    
+
+    /**
+     * Release some bytes that we're using rounded down to the page size.
+     *
+     * @param count    The number of bytes to release.  We will round this
+     *                 down to the page size.
+     *
+     * @return         The new number of usedBytes.
+     */
+    long releaseRoundDown(long count) {
+      count = rounder.roundDown(count);
+      return usedBytes.addAndGet(-count);
+    }
+
     long get() {
       return usedBytes.get();
     }
@@ -341,6 +359,52 @@ public class FsDatasetCache {
   }
 
   /**
+   * Try to reserve more bytes.
+   *
+   * @param count    The number of bytes to add.  We will round this
+   *                 up to the page size.
+   *
+   * @return         The new number of usedBytes if we succeeded;
+   *                 -1 if we failed.
+   */
+  long reserve(long count) {
+    return usedBytesCount.reserve(count);
+  }
+
+  /**
+   * Release some bytes that we're using.
+   *
+   * @param count    The number of bytes to release.  We will round this
+   *                 up to the page size.
+   *
+   * @return         The new number of usedBytes.
+   */
+  long release(long count) {
+    return usedBytesCount.release(count);
+  }
+
+  /**
+   * Release some bytes that we're using rounded down to the page size.
+   *
+   * @param count    The number of bytes to release.  We will round this
+   *                 down to the page size.
+   *
+   * @return         The new number of usedBytes.
+   */
+  long releaseRoundDown(long count) {
+    return usedBytesCount.releaseRoundDown(count);
+  }
+
+  /**
+   * Get the OS page size.
+   *
+   * @return the OS page size.
+   */
+  long getOsPageSize() {
+    return usedBytesCount.rounder.osPageSize;
+  }
+
+  /**
    * Background worker that mmaps, mlocks, and checksums a block
    */
   private class CachingTask implements Runnable {
@@ -363,7 +427,7 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = usedBytesCount.reserve(length);
+      long newUsedBytes = reserve(length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
@@ -423,7 +487,7 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            usedBytesCount.release(length);
+            release(length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
                   + "bytes in total.", key, usedBytesCount.get());
@@ -502,8 +566,7 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes =
-          usedBytesCount.release(value.mappableBlock.getLength());
+      long newUsedBytes = release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 46c488f..167f0d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -321,8 +321,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     cacheManager = new FsDatasetCache(this);
 
     // Start the lazy writer once we have built the replica maps.
-    lazyWriter = new Daemon(new LazyWriter(conf));
-    lazyWriter.start();
+    // We need to start the lazy writer even if MaxLockedMemory is set to
+    // zero because we may have un-persisted replicas in memory from before
+    // the process restart. To minimize the chances of data loss we'll
+    // ensure they get written to disk now.
+    if (ramDiskReplicaTracker.numReplicasNotPersisted() > 0 ||
+        datanode.getDnConf().getMaxLockedMemory() > 0) {
+      lazyWriter = new Daemon(new LazyWriter(conf));
+      lazyWriter.start();
+    } else {
+      lazyWriter = null;
+    }
+
     registerMBean(datanode.getDatanodeUuid());
 
     // Add a Metrics2 Source Interface. This is same
@@ -1286,26 +1296,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       " and thus cannot be created.");
     }
     // create a new block
-    FsVolumeReference ref;
-    while (true) {
+    FsVolumeReference ref = null;
+
+    // Use ramdisk only if block size is a multiple of OS page size.
+    // This simplifies reservation for partially used replicas
+    // significantly.
+    if (allowLazyPersist &&
+        lazyWriter != null &&
+        b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
+        (cacheManager.reserve(b.getNumBytes())) > 0) {
       try {
-        if (allowLazyPersist) {
-          // First try to place the block on a transient volume.
-          ref = volumes.getNextTransientVolume(b.getNumBytes());
-          datanode.getMetrics().incrRamDiskBlocksWrite();
-        } else {
-          ref = volumes.getNextVolume(storageType, b.getNumBytes());
-        }
-      } catch (DiskOutOfSpaceException de) {
-        if (allowLazyPersist) {
-          datanode.getMetrics().incrRamDiskBlocksWriteFallback();
-          allowLazyPersist = false;
-          continue;
+        // First try to place the block on a transient volume.
+        ref = volumes.getNextTransientVolume(b.getNumBytes());
+        datanode.getMetrics().incrRamDiskBlocksWrite();
+      } catch(DiskOutOfSpaceException de) {
+        // Ignore the exception since we just fall back to persistent storage.
+        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+      } finally {
+        if (ref == null) {
+          cacheManager.release(b.getNumBytes());
         }
-        throw de;
       }
-      break;
     }
+
+    if (ref == null) {
+      ref = volumes.getNextVolume(storageType, b.getNumBytes());
+    }
+
     FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
     // create an rbw file to hold block in the designated volume
     File f;
@@ -1566,7 +1583,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
 
       if (v.isTransientStorage()) {
-        ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+        releaseLockedMemory(
+            replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
+            false);
+        ramDiskReplicaTracker.addReplica(
+            bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
         datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
       }
     }
@@ -1813,9 +1834,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
-   * We're informed that a block is no longer valid.  We
-   * could lazily garbage-collect the block, but why bother?
-   * just get rid of it.
+   * We're informed that a block is no longer valid. Delete it.
    */
   @Override // FsDatasetSpi
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
@@ -2066,8 +2085,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public void shutdown() {
     fsRunning = false;
 
-    ((LazyWriter) lazyWriter.getRunnable()).stop();
-    lazyWriter.interrupt();
+    if (lazyWriter != null) {
+      ((LazyWriter) lazyWriter.getRunnable()).stop();
+      lazyWriter.interrupt();
+    }
 
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
@@ -2085,11 +2106,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volumes.shutdown();
     }
 
-    try {
-      lazyWriter.join();
-    } catch (InterruptedException ie) {
-      LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
-               "from LazyWriter.join");
+    if (lazyWriter != null) {
+      try {
+        lazyWriter.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
+                     "from LazyWriter.join");
+      }
     }
   }
 
@@ -2175,7 +2198,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
         volumeMap.add(bpid, diskBlockInfo);
         if (vol.isTransientStorage()) {
-          ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
+          long lockedBytesReserved =
+              cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ?
+                  diskBlockInfo.getNumBytes() : 0;
+          ramDiskReplicaTracker.addReplica(
+              bpid, blockId, (FsVolumeImpl) vol, lockedBytesReserved);
         }
         LOG.warn("Added missing block to memory " + diskBlockInfo);
         return;
@@ -2761,12 +2788,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     boolean ramDiskConfigured = ramDiskConfigured();
     // Add thread for DISK volume if RamDisk is configured
     if (ramDiskConfigured &&
+        asyncLazyPersistService != null &&
         !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
       asyncLazyPersistService.addVolume(v.getCurrentDir());
     }
 
     // Remove thread for DISK volume if RamDisk is not configured
     if (!ramDiskConfigured &&
+        asyncLazyPersistService != null &&
         asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
       asyncLazyPersistService.removeVolume(v.getCurrentDir());
     }
@@ -2791,9 +2820,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     // Remove the old replicas
     if (blockFile.delete() || !blockFile.exists()) {
-      ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+      FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
+      volume.onBlockFileDeletion(bpid, blockFileUsed);
       if (metaFile.delete() || !metaFile.exists()) {
-        ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+        volume.onMetaFileDeletion(bpid, metaFileUsed);
       }
     }
 
@@ -2906,8 +2936,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     /**
-     * Attempt to evict one or more transient block replicas we have at least
-     * spaceNeeded bytes free.
+     * Attempt to evict one or more transient block replicas until we
+     * have at least spaceNeeded bytes free.
      */
     private void evictBlocks() throws IOException {
       int iterations = 0;
@@ -3057,5 +3087,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       s.add(blockId);
     }
   }
+
+  void releaseLockedMemory(long count, boolean roundup) {
+    if (roundup) {
+      cacheManager.release(count);
+    } else {
+      cacheManager.releaseRoundDown(count);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 34bfa00..2f6b1c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -274,7 +274,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return getBlockPoolSlice(bpid).getTmpDir();
   }
 
-  void decDfsUsed(String bpid, long value) {
+  void onBlockFileDeletion(String bpid, long value) {
+    decDfsUsed(bpid, value);
+    if (isTransientStorage()) {
+      dataset.releaseLockedMemory(value, true);
+    }
+  }
+
+  void onMetaFileDeletion(String bpid, long value) {
+    decDfsUsed(bpid, value);
+  }
+
+  private void decDfsUsed(String bpid, long value) {
     synchronized(dataset) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
@@ -428,6 +439,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
+  @Override
+  public void releaseLockedMemory(long bytesToRelease) {
+    if (isTransientStorage()) {
+      dataset.releaseLockedMemory(bytesToRelease, false);
+    }
+  }
+
   private enum SubdirFilter implements FilenameFilter {
     INSTANCE;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index c01a6cf..b940736 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -38,8 +38,10 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
   private class RamDiskReplicaLru extends RamDiskReplica {
     long lastUsedTime;
 
-    private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
-      super(bpid, blockId, ramDiskVolume);
+    private RamDiskReplicaLru(String bpid, long blockId,
+                              FsVolumeImpl ramDiskVolume,
+                              long lockedBytesReserved) {
+      super(bpid, blockId, ramDiskVolume, lockedBytesReserved);
     }
 
     @Override
@@ -70,20 +72,23 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
   TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
 
   RamDiskReplicaLruTracker() {
-    replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
-    replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
+    replicaMaps = new HashMap<>();
+    replicasNotPersisted = new LinkedList<>();
     replicasPersisted = TreeMultimap.create();
   }
 
   @Override
   synchronized void addReplica(final String bpid, final long blockId,
-                               final FsVolumeImpl transientVolume) {
+                               final FsVolumeImpl transientVolume,
+                               long lockedBytesReserved) {
     Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
     if (map == null) {
-      map = new HashMap<Long, RamDiskReplicaLru>();
+      map = new HashMap<>();
       replicaMaps.put(bpid, map);
     }
-    RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
+    RamDiskReplicaLru ramDiskReplicaLru =
+        new RamDiskReplicaLru(bpid, blockId, transientVolume,
+                              lockedBytesReserved);
     map.put(blockId, ramDiskReplicaLru);
     replicasNotPersisted.add(ramDiskReplicaLru);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
index 7507925..335ed70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -45,6 +45,7 @@ public abstract class RamDiskReplicaTracker {
     private final long blockId;
     private File savedBlockFile;
     private File savedMetaFile;
+    private long lockedBytesReserved;
 
     private long creationTime;
     protected AtomicLong numReads = new AtomicLong(0);
@@ -61,10 +62,12 @@ public abstract class RamDiskReplicaTracker {
     FsVolumeImpl lazyPersistVolume;
 
     RamDiskReplica(final String bpid, final long blockId,
-                   final FsVolumeImpl ramDiskVolume) {
+                   final FsVolumeImpl ramDiskVolume,
+                   long lockedBytesReserved) {
       this.bpid = bpid;
       this.blockId = blockId;
       this.ramDiskVolume = ramDiskVolume;
+      this.lockedBytesReserved = lockedBytesReserved;
       lazyPersistVolume = null;
       savedMetaFile = null;
       savedBlockFile = null;
@@ -168,6 +171,10 @@ public abstract class RamDiskReplicaTracker {
     public String toString() {
       return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
     }
+
+    public long getLockedBytesReserved() {
+      return lockedBytesReserved;
+    }
   }
 
   /**
@@ -201,7 +208,8 @@ public abstract class RamDiskReplicaTracker {
    * @param transientVolume RAM disk volume that stores the replica.
    */
   abstract void addReplica(final String bpid, final long blockId,
-                           final FsVolumeImpl transientVolume);
+                           final FsVolumeImpl transientVolume,
+                           long lockedBytesReserved);
 
   /**
    * Invoked when a replica is opened by a client. This may be used as

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 6674dca..5388688 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1583,7 +1583,7 @@ public class MiniDFSCluster {
       throw new IllegalStateException("Attempting to finalize "
                                       + "Namenode but it is not running");
     }
-    ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+    ToolRunner.run(new DFSAdmin(conf), new String[]{"-finalizeUpgrade"});
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 5ee73d4..4b7d1fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
@@ -120,13 +121,16 @@ public class TestBalancer {
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
   }
 
-  static void initConfWithRamDisk(Configuration conf) {
+  static void initConfWithRamDisk(Configuration conf,
+                                  long ramDiskCapacity) {
     conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
     conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
     conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
+    LazyPersistTestCase.initCacheManipulator();
   }
 
   /* create a file with a length of <code>fileLen</code> */
@@ -1247,7 +1251,6 @@ public class TestBalancer {
     final int SEED = 0xFADED;
     final short REPL_FACT = 1;
     Configuration conf = new Configuration();
-    initConfWithRamDisk(conf);
 
     final int defaultRamDiskCapacity = 10;
     final long ramDiskStorageLimit =
@@ -1257,6 +1260,8 @@ public class TestBalancer {
       ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
       (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
 
+    initConfWithRamDisk(conf, ramDiskStorageLimit);
+
     cluster = new MiniDFSCluster
       .Builder(conf)
       .numDataNodes(1)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 2ac9416..778dd28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -492,6 +492,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     @Override
+    public void releaseLockedMemory(long bytesToRelease) {
+    }
+
+    @Override
     public void releaseReservedSpace(long bytesToRelease) {
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index e0b821a..68152fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
@@ -79,6 +80,8 @@ public class TestDirectoryScanner {
     CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
     CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
     CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+                 Long.MAX_VALUE);
   }
 
   /** create a file with a length of <code>fileLen</code> */
@@ -308,6 +311,7 @@ public class TestDirectoryScanner {
 
   @Test (timeout=300000)
   public void testRetainBlockOnPersistentStorage() throws Exception {
+    LazyPersistTestCase.initCacheManipulator();
     cluster = new MiniDFSCluster
         .Builder(CONF)
         .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@@ -349,6 +353,7 @@ public class TestDirectoryScanner {
 
   @Test (timeout=300000)
   public void testDeleteBlockOnTransientStorage() throws Exception {
+    LazyPersistTestCase.initCacheManipulator();
     cluster = new MiniDFSCluster
         .Builder(CONF)
         .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@@ -615,6 +620,10 @@ public class TestDirectoryScanner {
     }
 
     @Override
+    public void releaseLockedMemory(long bytesToRelease) {
+    }
+
+    @Override
     public BlockIterator newBlockIterator(String bpid, String name) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 7a09630..58932fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -339,7 +339,7 @@ public class TestFsDatasetCache {
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(cacheBlocks(fileLocs[i]));
       total = DFSTestUtil.verifyExpectedCacheUsage(
-          rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd);
+          rounder.roundUp(total + fileSizes[i]), 4 * (i + 1), fsd);
     }
 
     // nth file should hit a capacity exception
@@ -365,7 +365,7 @@ public class TestFsDatasetCache {
     int curCachedBlocks = 16;
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
-      long uncachedBytes = rounder.round(fileSizes[i]);
+      long uncachedBytes = rounder.roundUp(fileSizes[i]);
       total -= uncachedBytes;
       curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
       DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index b00d78f..8f6eb5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -83,6 +83,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
   }
 
   @Override
+  public void releaseLockedMemory(long bytesToRelease) {
+  }
+
+  @Override
   public BlockIterator newBlockIterator(String bpid, String name) {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 5dc86f7..5ce5cc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -23,16 +23,7 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -40,6 +31,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -68,6 +60,7 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -80,8 +73,8 @@ public abstract class LazyPersistTestCase {
   static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
 
   static {
-    DFSTestUtil.setNameNodeLogLevel(Level.ALL);
-    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.ALL);
+    DFSTestUtil.setNameNodeLogLevel(Level.DEBUG);
+    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
   }
 
   protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
@@ -95,6 +88,8 @@ public abstract class LazyPersistTestCase {
   protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
   protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
   protected static final short REPL_FACTOR = 1;
+  protected final long osPageSize =
+      NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
 
   protected MiniDFSCluster cluster;
   protected DistributedFileSystem fs;
@@ -194,7 +189,7 @@ public abstract class LazyPersistTestCase {
   protected final void makeRandomTestFile(Path path, long length,
       boolean isLazyPersist, long seed) throws IOException {
     DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
-      BLOCK_SIZE, REPL_FACTOR, seed, true);
+                           BLOCK_SIZE, REPL_FACTOR, seed, true);
   }
 
   protected final void makeTestFile(Path path, long length,
@@ -242,10 +237,12 @@ public abstract class LazyPersistTestCase {
       int ramDiskReplicaCapacity,
       long ramDiskStorageLimit,
       long evictionLowWatermarkReplicas,
+      long maxLockedMemory,
       boolean useSCR,
       boolean useLegacyBlockReaderLocal,
       boolean disableScrubber) throws IOException {
 
+    initCacheManipulator();
     Configuration conf = new Configuration();
     conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     if (disableScrubber) {
@@ -262,6 +259,7 @@ public abstract class LazyPersistTestCase {
     conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
                 evictionLowWatermarkReplicas * BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
 
     if (useSCR) {
       conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
@@ -311,6 +309,31 @@ public abstract class LazyPersistTestCase {
     LOG.info("Cluster startup complete");
   }
 
+  /**
+   * Use a dummy cache manipulator for testing.
+   */
+  public static void initCacheManipulator() {
+    NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.CacheManipulator() {
+      @Override
+      public void mlock(String identifier,
+                        ByteBuffer mmap, long length) throws IOException {
+        LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
+      }
+
+      @Override
+      public long getMemlockLimit() {
+        LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
+        return Long.MAX_VALUE;
+      }
+
+      @Override
+      public boolean verifyCanMlock() {
+        LOG.info("LazyPersistTestCase: fake return " + true);
+        return true;
+      }
+    });
+  }
+
   ClusterWithRamDiskBuilder getClusterBuilder() {
     return new ClusterWithRamDiskBuilder();
   }
@@ -344,6 +367,11 @@ public abstract class LazyPersistTestCase {
       return this;
     }
 
+    public ClusterWithRamDiskBuilder setMaxLockedMemory(long maxLockedMemory) {
+      this.maxLockedMemory = maxLockedMemory;
+      return this;
+    }
+
     public ClusterWithRamDiskBuilder setUseScr(boolean useScr) {
       this.useScr = useScr;
       return this;
@@ -376,13 +404,14 @@ public abstract class LazyPersistTestCase {
       LazyPersistTestCase.this.startUpCluster(
           numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
           ramDiskStorageLimit, evictionLowWatermarkReplicas,
-          useScr, useLegacyBlockReaderLocal,disableScrubber);
+          maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber);
     }
 
     private int numDatanodes = REPL_FACTOR;
     private StorageType[] storageTypes = null;
     private int ramDiskReplicaCapacity = -1;
     private long ramDiskStorageLimit = -1;
+    private long maxLockedMemory = Long.MAX_VALUE;
     private boolean hasTransientStorage = true;
     private boolean useScr = false;
     private boolean useLegacyBlockReaderLocal = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
new file mode 100644
index 0000000..9ea4665
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.fs.StorageType.DEFAULT;
+import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Verify that locked memory is used correctly when writing to replicas in
+ * memory
+ */
+public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
+
+  /**
+   * RAM disk present but locked memory is set to zero. Placement should
+   * fall back to disk.
+   */
+  @Test
+  public void testWithNoLockedMemory() throws IOException {
+    getClusterBuilder().setNumDatanodes(1)
+                       .setMaxLockedMemory(0).build();
+
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, DEFAULT);
+  }
+
+  @Test
+  public void testReservation()
+      throws IOException, TimeoutException, InterruptedException {
+    getClusterBuilder().setNumDatanodes(1)
+                       .setMaxLockedMemory(BLOCK_SIZE).build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+    // Create a file and ensure the replica in RAM_DISK uses locked memory.
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, RAM_DISK);
+    assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
+  }
+
+  @Test
+  public void testReleaseOnFileDeletion()
+      throws IOException, TimeoutException, InterruptedException {
+    getClusterBuilder().setNumDatanodes(1)
+                       .setMaxLockedMemory(BLOCK_SIZE).build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, RAM_DISK);
+    assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
+
+    // Delete the file and ensure that the locked memory is released.
+    fs.delete(path, false);
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    waitForLockedBytesUsed(fsd, 0);
+  }
+
+  /**
+   * Verify that locked RAM is released when blocks are evicted from RAM disk.
+   */
+  @Test
+  public void testReleaseOnEviction()
+      throws IOException, TimeoutException, InterruptedException {
+    getClusterBuilder().setNumDatanodes(1)
+                       .setMaxLockedMemory(BLOCK_SIZE)
+                       .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
+                       .build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, BLOCK_SIZE, true);
+
+    // The block should get evicted soon since it pushes RAM disk free
+    // space below the threshold.
+    waitForLockedBytesUsed(fsd, 0);
+
+    MetricsRecordBuilder rb =
+        MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+    MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
+  }
+
+  /**
+   * Verify that locked bytes are correctly updated when a block is finalized
+   * at less than its max length.
+   */
+  @Test
+  public void testShortBlockFinalized()
+      throws IOException, TimeoutException, InterruptedException {
+    getClusterBuilder().setNumDatanodes(1).build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, 1, true);
+    assertThat(fsd.getCacheUsed(), is(osPageSize));
+
+    // Delete the file and ensure locked RAM usage goes to zero.
+    fs.delete(path, false);
+    waitForLockedBytesUsed(fsd, 0);
+  }
+
+  /**
+   * Verify that locked bytes are correctly updated when the client goes
+   * away unexpectedly during a write.
+   */
+  @Test
+  public void testWritePipelineFailure()
+    throws IOException, TimeoutException, InterruptedException {
+    getClusterBuilder().setNumDatanodes(1).build();
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
+    // Write 1 byte to the file and kill the writer.
+    final FSDataOutputStream fos =
+        fs.create(path,
+                  FsPermission.getFileDefault(),
+                  createFlags,
+                  BUFFER_LENGTH,
+                  REPL_FACTOR,
+                  BLOCK_SIZE,
+                  null);
+
+    fos.write(new byte[1]);
+    fos.hsync();
+    DFSTestUtil.abortStream((DFSOutputStream) fos.getWrappedStream());
+    waitForLockedBytesUsed(fsd, osPageSize);
+
+    // Delete the file and ensure locked RAM goes to zero.
+    fs.delete(path, false);
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    waitForLockedBytesUsed(fsd, 0);
+  }
+
+  /**
+   * Wait until used locked byte count goes to the expected value.
+   * @throws TimeoutException after 300 seconds.
+   */
+  private void waitForLockedBytesUsed(final FsDatasetSpi<?> fsd,
+                                      final long expectedLockedBytes)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        long cacheUsed = fsd.getCacheUsed();
+        LOG.info("cacheUsed=" + cacheUsed + ", waiting for it to be " + expectedLockedBytes);
+        if (cacheUsed < 0) {
+          throw new IllegalStateException("cacheUsed unpexpectedly negative");
+        }
+        return (cacheUsed == expectedLockedBytes);
+      }
+    }, 1000, 300000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/151f6d6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index d5664cf..a77184b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -204,7 +204,7 @@ public class TestWriteToReplica {
     long available = v.getCapacity()-v.getDfsUsed();
     long expectedLen = blocks[FINALIZED].getNumBytes();
     try {
-      v.decDfsUsed(bpid, -available);
+      v.onBlockFileDeletion(bpid, -available);
       blocks[FINALIZED].setNumBytes(expectedLen+100);
       dataSet.append(blocks[FINALIZED], newGS, expectedLen);
       Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
@@ -212,7 +212,7 @@ public class TestWriteToReplica {
       Assert.assertTrue(e.getMessage().startsWith(
           "Insufficient space for appending to "));
     }
-    v.decDfsUsed(bpid, available);
+    v.onBlockFileDeletion(bpid, available);
     blocks[FINALIZED].setNumBytes(expectedLen);
 
     newGS = blocks[RBW].getGenerationStamp()+1;


Mime
View raw message