hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [19/34] git commit: HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
Date Fri, 17 Oct 2014 21:45:06 GMT
HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bf1b84ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bf1b84ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bf1b84ab

Branch: refs/heads/branch-2
Commit: bf1b84abe1a6ee5a740978b9e37b276ed61567b6
Parents: f6903ca
Author: arp <arp@apache.org>
Authored: Sat Sep 20 13:25:23 2014 -0700
Committer: Jitendra Pandey <Jitendra@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:02 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   3 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   3 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  11 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  97 ++++---
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  14 +-
 .../datanode/fsdataset/impl/FsVolumeList.java   |   4 +-
 .../fsdataset/impl/LazyWriteReplicaTracker.java | 268 -------------------
 .../impl/RamDiskReplicaLruTracker.java          | 208 ++++++++++++++
 .../fsdataset/impl/RamDiskReplicaTracker.java   | 245 +++++++++++++++++
 .../fsdataset/impl/FsDatasetTestUtil.java       |   2 +-
 .../fsdataset/impl/TestLazyPersistFiles.java    |  47 ++--
 11 files changed, 556 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f5bba86..b5c7fed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.hdfs.web.AuthFilter;
 import org.apache.hadoop.http.HttpConfig;
 
@@ -129,6 +130,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
   public static final String  DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
   public static final int     DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
+  public static final String  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
+  public static final Class<RamDiskReplicaLruTracker>  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
   public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
   public static final int     DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
   public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bf09899..f10be62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2088,7 +2088,8 @@ public class DataNode extends ReconfigurableBase
       LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
     }
-    if (blockScanner != null) {
+    FsVolumeSpi volume = getFSDataset().getVolume(block);
+    if (blockScanner != null && !volume.isTransientStorage()) {
       blockScanner.addBlock(block);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 06d60b1..bfa1772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -292,9 +292,9 @@ class BlockPoolSlice {
    * Move a persisted replica from lazypersist directory to a subdirectory
    * under finalized.
    */
-  File activateSavedReplica(Block b, File blockFile) throws IOException {
+  File activateSavedReplica(Block b, File metaFile, File blockFile)
+      throws IOException {
     final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
-    final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
     final File targetBlockFile = new File(blockDir, blockFile.getName());
     final File targetMetaFile = new File(blockDir, metaFile.getName());
     FileUtils.moveFile(blockFile, targetBlockFile);
@@ -313,7 +313,7 @@ class BlockPoolSlice {
 
     
   void getVolumeMap(ReplicaMap volumeMap,
-                    final LazyWriteReplicaTracker lazyWriteReplicaMap)
+                    final RamDiskReplicaTracker lazyWriteReplicaMap)
       throws IOException {
     // Recover lazy persist replicas, they will be added to the volumeMap
     // when we scan the finalized directory.
@@ -410,7 +410,7 @@ class BlockPoolSlice {
    *                    false if the directory has rbw replicas
    */
   void addToReplicasMap(ReplicaMap volumeMap, File dir,
-                        final LazyWriteReplicaTracker lazyWriteReplicaMap,
+                        final RamDiskReplicaTracker lazyWriteReplicaMap,
                         boolean isFinalized)
       throws IOException {
     File files[] = FileUtil.listFiles(dir);
@@ -487,7 +487,8 @@ class BlockPoolSlice {
       // it is in the lazyWriteReplicaMap so it can be persisted
       // eventually.
       if (newReplica.getVolume().isTransientStorage()) {
-        lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
+        lazyWriteReplicaMap.addReplica(bpid, blockId,
+                                       (FsVolumeImpl) newReplica.getVolume());
       } else {
         lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 83d94a3..11322a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -30,7 +30,6 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -88,6 +87,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -159,7 +159,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public synchronized Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    File blockfile = getFile(bpid, blkid);
+    File blockfile = getFile(bpid, blkid, false);
     if (blockfile == null) {
       return null;
     }
@@ -219,7 +219,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private volatile boolean fsRunning;
 
   final ReplicaMap volumeMap;
-  final LazyWriteReplicaTracker lazyWriteReplicaTracker;
+  final RamDiskReplicaTracker ramDiskReplicaTracker;
 
   private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
 
@@ -263,7 +263,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
     volumeMap = new ReplicaMap(this);
-    lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
+    ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@@ -298,7 +298,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
     ReplicaMap tempVolumeMap = new ReplicaMap(this);
-    fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker);
+    fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     volumeMap.addAll(tempVolumeMap);
     volumes.addVolume(fsVolume);
@@ -326,7 +326,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (final String bpid : bpids) {
       try {
         fsVolume.addBlockPool(bpid, this.conf);
-        fsVolume.getVolumeMap(bpid, tempVolumeMap);
+        fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
       } catch (IOException e) {
         LOG.warn("Caught exception when adding " + fsVolume +
             ". Will throw later.", e);
@@ -586,12 +586,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * checking that it exists. This should be used when the
    * next operation is going to open the file for read anyway,
    * and thus the exists check is redundant.
+   *
+   * @param touch if true then update the last access timestamp of the
+   *              block. Currently used for blocks on transient storage.
    */
-  private File getBlockFileNoExistsCheck(ExtendedBlock b)
+  private File getBlockFileNoExistsCheck(ExtendedBlock b,
+                                         boolean touch)
       throws IOException {
     final File f;
     synchronized(this) {
-      f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId());
+      f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
     }
     if (f == null) {
       throw new IOException("Block " + b + " is not valid");
@@ -602,7 +606,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
-    File blockFile = getBlockFileNoExistsCheck(b);
+    File blockFile = getBlockFileNoExistsCheck(b, true);
     if (isNativeIOAvailable) {
       return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
     } else {
@@ -1240,7 +1244,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
 
       if (v.isTransientStorage()) {
-        lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+        ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
       }
     }
     volumeMap.add(bpid, newReplicaInfo);
@@ -1265,7 +1269,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         LOG.warn("Block " + b + " unfinalized and removed. " );
       }
       if (replicaInfo.getVolume().isTransientStorage()) {
-        lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+        ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
       }
     }
   }
@@ -1411,7 +1415,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     //Should we check for metadata file too?
     final File f;
     synchronized(this) {
-      f = getFile(bpid, blockId);
+      f = getFile(bpid, blockId, false);
     }
     
     if(f != null ) {
@@ -1496,7 +1500,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       if (v.isTransientStorage()) {
-        lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
+        ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
       }
 
       // If a DFSClient has the replica in its cache of short-circuit file
@@ -1628,7 +1632,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public synchronized boolean contains(final ExtendedBlock block) {
     final long blockId = block.getLocalBlock().getBlockId();
-    return getFile(block.getBlockPoolId(), blockId) != null;
+    return getFile(block.getBlockPoolId(), blockId, false) != null;
   }
 
   /**
@@ -1637,9 +1641,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param blockId a block's id
    * @return on disk data file path; null if the replica does not exist
    */
-  File getFile(final String bpid, final long blockId) {
+  File getFile(final String bpid, final long blockId, boolean touch) {
     ReplicaInfo info = volumeMap.get(bpid, blockId);
     if (info != null) {
+      if (touch && info.getVolume().isTransientStorage()) {
+        ramDiskReplicaTracker.touch(bpid, blockId);
+      }
       return info.getBlockFile();
     }
     return null;    
@@ -1808,7 +1815,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             blockScanner.deleteBlock(bpid, new Block(blockId));
           }
           if (vol.isTransientStorage()) {
-            lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
+            ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
           }
           LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
@@ -1830,11 +1837,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
         volumeMap.add(bpid, diskBlockInfo);
         final DataBlockScanner blockScanner = datanode.getBlockScanner();
-        if (blockScanner != null) {
-          blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
-        }
-        if (vol.isTransientStorage()) {
-          lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
+        if (!vol.isTransientStorage()) {
+          if (blockScanner != null) {
+            blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+          }
+        } else {
+          ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
         }
         LOG.warn("Added missing block to memory " + diskBlockInfo);
         return;
@@ -2117,7 +2125,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volumes.addBlockPool(bpid, conf);
       volumeMap.initBlockPool(bpid);
     }
-    volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
+    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
   }
 
   @Override
@@ -2347,7 +2355,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
         }
 
-        lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+        ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
         bpSlice = targetVolume.getBlockPoolSlice(bpid);
         srcMeta = replicaInfo.getMetaFile();
         srcFile = replicaInfo.getBlockFile();
@@ -2359,7 +2367,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
 
       synchronized (FsDatasetImpl.this) {
-        lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
+        ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
@@ -2374,21 +2382,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
      * @return true if there is more work to be done, false otherwise.
      */
     private boolean saveNextReplica() {
-      LazyWriteReplicaTracker.ReplicaState replicaState = null;
+      RamDiskReplica block = null;
       boolean succeeded = false;
 
       try {
-        replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
-        if (replicaState != null) {
-          moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
+        block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
+        if (block != null) {
+          moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
         }
         succeeded = true;
       } catch(IOException ioe) {
-        LOG.warn("Exception saving replica " + replicaState, ioe);
+        LOG.warn("Exception saving replica " + block, ioe);
       } finally {
-        if (!succeeded && replicaState != null) {
-          LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
-          lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
+        if (!succeeded && block != null) {
+          LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
+          ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
         }
       }
 
@@ -2426,8 +2434,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
              transientFreeSpaceBelowThreshold()) {
-        LazyWriteReplicaTracker.ReplicaState replicaState =
-            lazyWriteReplicaTracker.getNextCandidateForEviction();
+        RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
 
         if (replicaState == null) {
           break;
@@ -2440,46 +2447,48 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         ReplicaInfo replicaInfo, newReplicaInfo;
         File blockFile, metaFile;
         long blockFileUsed, metaFileUsed;
+        final String bpid = replicaState.getBlockPoolId();
 
         synchronized (FsDatasetImpl.this) {
-          replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
+          replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
           blockFile = replicaInfo.getBlockFile();
           metaFile = replicaInfo.getMetaFile();
           blockFileUsed = blockFile.length();
           metaFileUsed = metaFile.length();
-          lazyWriteReplicaTracker.discardReplica(replicaState, false);
+          ramDiskReplicaTracker.discardReplica(replicaState, false);
 
           // Move the replica from lazyPersist/ to finalized/ on target volume
           BlockPoolSlice bpSlice =
-              replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
+              replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
           File newBlockFile = bpSlice.activateSavedReplica(
-              replicaInfo, replicaState.savedBlockFile);
+              replicaInfo, replicaState.getSavedMetaFile(),
+              replicaState.getSavedBlockFile());
 
           newReplicaInfo =
               new FinalizedReplica(replicaInfo.getBlockId(),
                                    replicaInfo.getBytesOnDisk(),
                                    replicaInfo.getGenerationStamp(),
-                                   replicaState.lazyPersistVolume,
+                                   replicaState.getLazyPersistVolume(),
                                    newBlockFile.getParentFile());
 
           // Update the volumeMap entry.
-          volumeMap.add(replicaState.bpid, newReplicaInfo);
+          volumeMap.add(bpid, newReplicaInfo);
         }
 
         // Before deleting the files from transient storage we must notify the
         // NN that the files are on the new storage. Else a blockReport from
         // the transient storage might cause the NN to think the blocks are lost.
         ExtendedBlock extendedBlock =
-            new ExtendedBlock(replicaState.bpid, newReplicaInfo);
+            new ExtendedBlock(bpid, newReplicaInfo);
         datanode.notifyNamenodeReceivedBlock(
             extendedBlock, null, newReplicaInfo.getStorageUuid());
 
         // Remove the old replicas from transient storage.
         if (blockFile.delete() || !blockFile.exists()) {
-          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
+          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
           if (metaFile.delete() || !metaFile.exists()) {
-            ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+            ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
           }
         }
 
@@ -2500,7 +2509,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Sleep if we have no more work to do or if it looks like we are not
           // making any forward progress. This is to ensure that if all persist
           // operations are failing we don't keep retrying them in a tight loop.
-          if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
+          if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
             Thread.sleep(checkpointerInterval * 1000);
             numSuccessiveFailures = 0;
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e2efb2f..098ad09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -235,9 +235,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
   @Override
   public void reserveSpaceForRbw(long bytesToReserve) {
     if (bytesToReserve != 0) {
-      if (FsDatasetImpl.LOG.isDebugEnabled()) {
-        FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
-      }
       reservedForRbw.addAndGet(bytesToReserve);
     }
   }
@@ -245,9 +242,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
   @Override
   public void releaseReservedSpace(long bytesToRelease) {
     if (bytesToRelease != 0) {
-      if (FsDatasetImpl.LOG.isDebugEnabled()) {
-        FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
-      }
 
       long oldReservation, newReservation;
       do {
@@ -298,17 +292,17 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
     
   void getVolumeMap(ReplicaMap volumeMap,
-                    final LazyWriteReplicaTracker lazyWriteReplicaMap)
+                    final RamDiskReplicaTracker ramDiskReplicaMap)
       throws IOException {
     for(BlockPoolSlice s : bpSlices.values()) {
-      s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
+      s.getVolumeMap(volumeMap, ramDiskReplicaMap);
     }
   }
   
   void getVolumeMap(String bpid, ReplicaMap volumeMap,
-                    final LazyWriteReplicaTracker lazyWriteReplicaMap)
+                    final RamDiskReplicaTracker ramDiskReplicaMap)
       throws IOException {
-    getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
+    getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 82fe35f..837ddf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -121,7 +121,7 @@ class FsVolumeList {
   
   void getAllVolumesMap(final String bpid,
                         final ReplicaMap volumeMap,
-                        final LazyWriteReplicaTracker lazyWriteReplicaMap)
+                        final RamDiskReplicaTracker ramDiskReplicaMap)
       throws IOException {
     long totalStartTime = Time.monotonicNow();
     final List<IOException> exceptions = Collections.synchronizedList(
@@ -134,7 +134,7 @@ class FsVolumeList {
             FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
                 bpid + " on volume " + v + "...");
             long startTime = Time.monotonicNow();
-            v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
+            v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
             long timeTaken = Time.monotonicNow() - startTime;
             FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
                 + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
deleted file mode 100644
index e8d9c5c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-
-import com.google.common.collect.TreeMultimap;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import java.io.File;
-import java.util.*;
-
-class LazyWriteReplicaTracker {
-
-  enum State {
-    IN_MEMORY,
-    LAZY_PERSIST_IN_PROGRESS,
-    LAZY_PERSIST_COMPLETE,
-  }
-
-  static class ReplicaState implements Comparable<ReplicaState> {
-
-    final String bpid;
-    final long blockId;
-    State state;
-
-    /**
-     * transient storage volume that holds the original replica.
-     */
-    final FsVolumeSpi transientVolume;
-
-    /**
-     * Persistent volume that holds or will hold the saved replica.
-     */
-    FsVolumeImpl lazyPersistVolume;
-    File savedMetaFile;
-    File savedBlockFile;
-
-    ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
-      this.bpid = bpid;
-      this.blockId = blockId;
-      this.transientVolume = transientVolume;
-      state = State.IN_MEMORY;
-      lazyPersistVolume = null;
-      savedMetaFile = null;
-      savedBlockFile = null;
-    }
-
-    void deleteSavedFiles() {
-      try {
-        if (savedBlockFile != null) {
-          savedBlockFile.delete();
-          savedBlockFile = null;
-        }
-
-        if (savedMetaFile != null) {
-          savedMetaFile.delete();
-          savedMetaFile = null;
-        }
-      } catch (Throwable t) {
-        // Ignore any exceptions.
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
-    }
-
-    @Override
-    public int hashCode() {
-      return bpid.hashCode() ^ (int) blockId;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (this == other) {
-        return true;
-      }
-
-      if (other == null || getClass() != other.getClass()) {
-        return false;
-      }
-
-      ReplicaState otherState = (ReplicaState) other;
-      return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
-    }
-
-    @Override
-    public int compareTo(ReplicaState other) {
-      if (blockId == other.blockId) {
-        return 0;
-      } else if (blockId < other.blockId) {
-        return -1;
-      } else {
-        return 1;
-      }
-    }
-  }
-
-  final FsDatasetImpl fsDataset;
-
-  /**
-   * Map of blockpool ID to map of blockID to ReplicaInfo.
-   */
-  final Map<String, Map<Long, ReplicaState>> replicaMaps;
-
-  /**
-   * Queue of replicas that need to be written to disk.
-   * Stale entries are GC'd by dequeueNextReplicaToPersist.
-   */
-  final Queue<ReplicaState> replicasNotPersisted;
-
-  /**
-   * Queue of replicas in the order in which they were persisted.
-   * We'll dequeue them in the same order.
-   * We can improve the eviction scheme later.
-   * Stale entries are GC'd by getNextCandidateForEviction.
-   */
-  final Queue<ReplicaState> replicasPersisted;
-
-  LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
-    this.fsDataset = fsDataset;
-    replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
-    replicasNotPersisted = new LinkedList<ReplicaState>();
-    replicasPersisted = new LinkedList<ReplicaState>();
-  }
-
-  synchronized void addReplica(String bpid, long blockId,
-                               final FsVolumeSpi transientVolume) {
-    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
-    if (map == null) {
-      map = new HashMap<Long, ReplicaState>();
-      replicaMaps.put(bpid, map);
-    }
-    ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
-    map.put(blockId, replicaState);
-    replicasNotPersisted.add(replicaState);
-  }
-
-  synchronized void recordStartLazyPersist(
-      final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
-    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
-    ReplicaState replicaState = map.get(blockId);
-    replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
-    replicaState.lazyPersistVolume = checkpointVolume;
-  }
-
-  /**
-   * @param bpid
-   * @param blockId
-   * @param savedFiles The saved meta and block files, in that order.
-   */
-  synchronized void recordEndLazyPersist(
-      final String bpid, final long blockId, final File[] savedFiles) {
-    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
-    ReplicaState replicaState = map.get(blockId);
-
-    if (replicaState == null) {
-      throw new IllegalStateException("Unknown replica bpid=" +
-          bpid + "; blockId=" + blockId);
-    }
-    replicaState.state = State.LAZY_PERSIST_COMPLETE;
-    replicaState.savedMetaFile = savedFiles[0];
-    replicaState.savedBlockFile = savedFiles[1];
-
-    if (replicasNotPersisted.peek() == replicaState) {
-      // Common case.
-      replicasNotPersisted.remove();
-    } else {
-      // Should never occur in practice as lazy writer always persists
-      // the replica at the head of the queue before moving to the next
-      // one.
-      replicasNotPersisted.remove(replicaState);
-    }
-
-    replicasPersisted.add(replicaState);
-  }
-
-  synchronized ReplicaState dequeueNextReplicaToPersist() {
-    while (replicasNotPersisted.size() != 0) {
-      ReplicaState replicaState = replicasNotPersisted.remove();
-      Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
-
-      if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
-        return replicaState;
-      }
-
-      // The replica no longer exists, look for the next one.
-    }
-    return null;
-  }
-
-  synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
-    replicasNotPersisted.add(replicaState);
-  }
-
-  synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
-    replicasPersisted.add(replicaState);
-  }
-
-  synchronized int numReplicasNotPersisted() {
-    return replicasNotPersisted.size();
-  }
-
-  synchronized ReplicaState getNextCandidateForEviction() {
-    while (replicasPersisted.size() != 0) {
-      ReplicaState replicaState = replicasPersisted.remove();
-      Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
-
-      if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
-        return replicaState;
-      }
-
-      // The replica no longer exists, look for the next one.
-    }
-    return null;
-  }
-
-  void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) {
-    discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies);
-  }
-
-  /**
-   * Discard any state we are tracking for the given replica. This could mean
-   * the block is either deleted from the block space or the replica is no longer
-   * on transient storage.
-   *
-   * @param deleteSavedCopies true if we should delete the saved copies on
-   *                          persistent storage. This should be set by the
-   *                          caller when the block is no longer needed.
-   */
-  synchronized void discardReplica(
-      final String bpid, final long blockId,
-      boolean deleteSavedCopies) {
-    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
-
-    if (map == null) {
-      return;
-    }
-
-    ReplicaState replicaState = map.get(blockId);
-
-    if (replicaState == null) {
-      return;
-    }
-
-    if (deleteSavedCopies) {
-      replicaState.deleteSavedFiles();
-    }
-    map.remove(blockId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
new file mode 100644
index 0000000..0899e70
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.collect.TreeMultimap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * An implementation of RamDiskReplicaTracker that uses an LRU
+ * eviction scheme.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
+
+  private class RamDiskReplicaLru extends RamDiskReplica {
+    long lastUsedTime;
+
+    private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
+      super(bpid, blockId, ramDiskVolume);
+    }
+  }
+
+  /**
+   * Map of blockpool ID to <map of blockID to ReplicaInfo>.
+   */
+  Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
+
+  /**
+   * Queue of replicas that need to be written to disk.
+   * Stale entries are GC'd by dequeueNextReplicaToPersist.
+   */
+  Queue<RamDiskReplicaLru> replicasNotPersisted;
+
+  /**
+   * Map of persisted replicas ordered by their last use times.
+   */
+  TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
+
+  RamDiskReplicaLruTracker() {
+    replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
+    replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
+    replicasPersisted = TreeMultimap.create();
+  }
+
+  @Override
+  synchronized void addReplica(final String bpid, final long blockId,
+                               final FsVolumeImpl transientVolume) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    if (map == null) {
+      map = new HashMap<Long, RamDiskReplicaLru>();
+      replicaMaps.put(bpid, map);
+    }
+    RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
+    map.put(blockId, ramDiskReplicaLru);
+    replicasNotPersisted.add(ramDiskReplicaLru);
+  }
+
+  @Override
+  synchronized void touch(final String bpid,
+                          final long blockId) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+    if (ramDiskReplicaLru == null) {
+      return;
+    }
+
+    // Reinsert the replica with its new timestamp.
+    if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
+      ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+      replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+    }
+  }
+
+  @Override
+  synchronized void recordStartLazyPersist(
+      final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+    ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
+  }
+
+  @Override
+  synchronized void recordEndLazyPersist(
+      final String bpid, final long blockId, final File[] savedFiles) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+    if (ramDiskReplicaLru == null) {
+      throw new IllegalStateException("Unknown replica bpid=" +
+          bpid + "; blockId=" + blockId);
+    }
+    ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
+
+    if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
+      // Common case.
+      replicasNotPersisted.remove();
+    } else {
+      // Caller error? Fallback to O(n) removal.
+      replicasNotPersisted.remove(ramDiskReplicaLru);
+    }
+
+    ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+    replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+  }
+
+  @Override
+  synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
+    while (replicasNotPersisted.size() != 0) {
+      RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
+      Map<Long, RamDiskReplicaLru> replicaMap =
+          replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+      if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+        return ramDiskReplicaLru;
+      }
+
+      // The replica no longer exists, look for the next one.
+    }
+    return null;
+  }
+
+  @Override
+  synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
+    replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
+  }
+
+  @Override
+  synchronized int numReplicasNotPersisted() {
+    return replicasNotPersisted.size();
+  }
+
+  @Override
+  synchronized RamDiskReplicaLru getNextCandidateForEviction() {
+    Iterator it = replicasPersisted.values().iterator();
+    while (it.hasNext()) {
+      RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+      it.remove();
+
+      Map<Long, RamDiskReplicaLru> replicaMap =
+          replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+      if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+        return ramDiskReplicaLru;
+      }
+
+      // The replica no longer exists, look for the next one.
+    }
+    return null;
+  }
+
+  /**
+   * Discard any state we are tracking for the given replica. This could mean
+   * the block is either deleted from the block space or the replica is no longer
+   * on transient storage.
+   *
+   * @param deleteSavedCopies true if we should delete the saved copies on
+   *                          persistent storage. This should be set by the
+   *                          caller when the block is no longer needed.
+   */
+  @Override
+  synchronized void discardReplica(
+      final String bpid, final long blockId,
+      boolean deleteSavedCopies) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+    if (map == null) {
+      return;
+    }
+
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+    if (ramDiskReplicaLru == null) {
+      return;
+    }
+
+    if (deleteSavedCopies) {
+      ramDiskReplicaLru.deleteSavedFiles();
+    }
+
+    map.remove(blockId);
+    replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+
+    // replicasNotPersisted will be lazily GC'ed.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
new file mode 100644
index 0000000..03fc068
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.File;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class RamDiskReplicaTracker {
+
+  FsDatasetImpl fsDataset;
+
+  static class RamDiskReplica implements Comparable<RamDiskReplica>  {
+    private final String bpid;
+    private final long blockId;
+    private File savedBlockFile;
+    private File savedMetaFile;
+
+    /**
+     * RAM_DISK volume that holds the original replica.
+     */
+    final FsVolumeSpi ramDiskVolume;
+
+    /**
+     * Persistent volume that holds or will hold the saved replica.
+     */
+    FsVolumeImpl lazyPersistVolume;
+
+    RamDiskReplica(final String bpid, final long blockId,
+                   final FsVolumeImpl ramDiskVolume) {
+      this.bpid = bpid;
+      this.blockId = blockId;
+      this.ramDiskVolume = ramDiskVolume;
+      lazyPersistVolume = null;
+      savedMetaFile = null;
+      savedBlockFile = null;
+    }
+
+    long getBlockId() {
+      return blockId;
+    }
+
+    String getBlockPoolId() {
+      return bpid;
+    }
+
+    FsVolumeImpl getLazyPersistVolume() {
+      return lazyPersistVolume;
+    }
+
+    void setLazyPersistVolume(FsVolumeImpl volume) {
+      Preconditions.checkState(!volume.isTransientStorage());
+      this.lazyPersistVolume = volume;
+    }
+
+    File getSavedBlockFile() {
+      return savedBlockFile;
+    }
+
+    File getSavedMetaFile() {
+      return savedMetaFile;
+    }
+
+    /**
+     * Record the saved meta and block files on the given volume.
+     *
+     * @param files Meta and block files, in that order.
+     */
+    void recordSavedBlockFiles(File[] files) {
+      this.savedMetaFile = files[0];
+      this.savedBlockFile = files[1];
+    }
+
+    @Override
+    public int hashCode() {
+      return bpid.hashCode() ^ (int) blockId;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      RamDiskReplica otherState = (RamDiskReplica) other;
+      return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
+    }
+
+    // Delete the saved meta and block files. Failure to delete can be
+    // ignored, the directory scanner will retry the deletion later.
+    void deleteSavedFiles() {
+      try {
+        if (savedBlockFile != null) {
+          savedBlockFile.delete();
+          savedBlockFile = null;
+        }
+
+        if (savedMetaFile != null) {
+          savedMetaFile.delete();
+          savedMetaFile = null;
+        }
+      } catch (Throwable t) {
+        // Ignore any exceptions.
+      }
+    }
+
+    @Override
+    public int compareTo(RamDiskReplica other) {
+      int bpidResult = bpid.compareTo(other.bpid);
+      if (bpidResult == 0)
+        if (blockId == other.blockId) {
+          return 0;
+        } else if (blockId < other.blockId) {
+          return -1;
+        } else {
+          return 1;
+        }
+      return bpidResult;
+    }
+
+    @Override
+    public String toString() {
+      return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
+    }
+  }
+
+  /**
+   * Get an instance of the configured RamDiskReplicaTracker based on the
+   * the configuration property
+   * {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}.
+   *
+   * @param conf the configuration to be used
+   * @param dataset the FsDataset object.
+   * @return an instance of RamDiskReplicaTracker
+   */
+  static RamDiskReplicaTracker getInstance(final Configuration conf,
+                                           final FsDatasetImpl fsDataset) {
+    final Class<? extends RamDiskReplicaTracker> trackerClass = conf.getClass(
+        DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY,
+        DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT,
+        RamDiskReplicaTracker.class);
+    final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance(
+        trackerClass, conf);
+    tracker.initialize(fsDataset);
+    return tracker;
+  }
+
+  void initialize(final FsDatasetImpl fsDataset) {
+    this.fsDataset = fsDataset;
+  }
+
+  /**
+   * Start tracking a new finalized replica on RAM disk.
+   *
+   * @param transientVolume RAM disk volume that stores the replica.
+   */
+  abstract void addReplica(final String bpid, final long blockId,
+                           final FsVolumeImpl transientVolume);
+
+  /**
+   * Invoked when a replica is opened by a client. This may be used as
+   * a heuristic by the eviction scheme.
+   */
+  abstract void touch(final String bpid, final long blockId);
+
+  /**
+   * Get the next replica to write to persistent storage.
+   */
+  abstract RamDiskReplica dequeueNextReplicaToPersist();
+
+  /**
+   * Invoked if a replica that was previously dequeued for persistence
+   * could not be successfully persisted. Add it back so it can be retried
+   * later.
+   */
+  abstract void reenqueueReplicaNotPersisted(
+      final RamDiskReplica ramDiskReplica);
+
+  /**
+   * Invoked when the Lazy persist operation is started by the DataNode.
+   * @param checkpointVolume
+   */
+  abstract void recordStartLazyPersist(
+      final String bpid, final long blockId, FsVolumeImpl checkpointVolume);
+
+  /**
+   * Invoked when the Lazy persist operation is complete.
+   *
+   * @param savedFiles The saved meta and block files, in that order.
+   */
+  abstract void recordEndLazyPersist(
+      final String bpid, final long blockId, final File[] savedFiles);
+
+  /**
+   * Return a candidate replica to remove from RAM Disk. The exact replica
+   * to be returned may depend on the eviction scheme utilized.
+   *
+   * @return
+   */
+  abstract RamDiskReplica getNextCandidateForEviction();
+
+  /**
+   * Return the number of replicas pending persistence to disk.
+   */
+  abstract int numReplicasNotPersisted();
+
+  /**
+   * Discard all state we are tracking for the given replica.
+   */
+  abstract void discardReplica(
+      final String bpid, final long blockId,
+      boolean deleteSavedCopies);
+
+  void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
+    discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index 48ddcc2..f9e30e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 public class FsDatasetTestUtil {
 
   public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
-    return ((FsDatasetImpl)fsd).getFile(bpid, bid);
+    return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
   }
 
   public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf1b84ab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 777779f..95404b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -71,7 +71,7 @@ public class TestLazyPersistFiles {
   private static final int THREADPOOL_SIZE = 10;
 
   private static final short REPL_FACTOR = 1;
-  private static final int BLOCK_SIZE = 10485760;   // 10 MB
+  private static final int BLOCK_SIZE = 5 * 1024 * 1024;
   private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
   private static final long HEARTBEAT_INTERVAL_SEC = 1;
   private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@@ -449,34 +449,51 @@ public class TestLazyPersistFiles {
    * @throws InterruptedException
    */
   @Test (timeout=300000)
-  public void testRamDiskEvictionLRU()
+  public void testRamDiskEvictionIsLru()
     throws IOException, InterruptedException {
-    startUpCluster(true, 3);
+    final int NUM_PATHS = 5;
+    startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
-    final int NUM_PATHS = 6;
-    Path paths[] = new Path[NUM_PATHS];
+    Path paths[] = new Path[NUM_PATHS * 2];
 
-    for (int i = 0; i < NUM_PATHS; i++) {
+    for (int i = 0; i < paths.length; i++) {
       paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
     }
 
-    // No eviction for the first half of files
-    for (int i = 0; i < NUM_PATHS/2; i++) {
+    for (int i = 0; i < NUM_PATHS; i++) {
       makeTestFile(paths[i], BLOCK_SIZE, true);
-      ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
     }
 
-    // Lazy persist writer persists the first half of files
+    // Sleep for a short time to allow the lazy writer thread to do its job.
     Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
 
-    // Create the second half of files with eviction upon each create.
-    for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
-      makeTestFile(paths[i], BLOCK_SIZE, true);
+    for (int i = 0; i < NUM_PATHS; ++i) {
       ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+    }
+
+    // Open the files for read in a random order.
+    ArrayList<Integer> indexes = new ArrayList<Integer>(NUM_PATHS);
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      indexes.add(i);
+    }
+    Collections.shuffle(indexes);
 
-      // path[i-NUM_PATHS/2] is expected to be evicted by LRU
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      LOG.info("Touching file " + paths[indexes.get(i)]);
+      DFSTestUtil.readFile(fs, paths[indexes.get(i)]);
+    }
+
+    // Create an equal number of new files ensuring that the previous
+    // files are evicted in the same order they were read.
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
       triggerBlockReport();
-      ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
+      Thread.sleep(3000);
+      ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
+      ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
+      for (int j = i + 1; j < NUM_PATHS; ++j) {
+        ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
+      }
     }
   }
 


Mime
View raw message