hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [05/34] git commit: HDFS-6926. DN support for saving replicas to persistent storage and evicting in-memory replicas. (Arpit Agarwal)
Date Fri, 17 Oct 2014 23:30:52 GMT
HDFS-6926. DN support for saving replicas to persistent storage and evicting in-memory replicas.
(Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e01a89b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e01a89b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e01a89b

Branch: refs/heads/branch-2.6
Commit: 8e01a89b618130b9757b63c1b9da657c3f74b91b
Parents: 34d0088
Author: arp <arp@apache.org>
Authored: Wed Aug 27 15:36:48 2014 -0700
Committer: Jitendra Pandey <Jitendra@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:49 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../server/datanode/BlockPoolSliceScanner.java  |   2 +-
 .../hdfs/server/datanode/DataStorage.java       |   1 +
 .../hdfs/server/datanode/ReplicaInPipeline.java |   2 +-
 .../AvailableSpaceVolumeChoosingPolicy.java     |   6 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   3 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  45 ++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 312 ++++++++++++++++++-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   6 +
 .../fsdataset/impl/LazyWriteReplicaTracker.java | 177 +++++++++++
 .../server/datanode/SimulatedFSDataset.java     |   5 +
 .../hdfs/server/namenode/TestAddBlockRetry.java |   1 +
 12 files changed, 548 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5eb449a..e12b01a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -127,6 +127,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
= 4;
+  public static final String  DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
+  public static final int     DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
   public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT
=
     "dfs.namenode.path.based.cache.block.map.allocation.percent";
   public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT
= 0.25f;
@@ -229,6 +231,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final float   DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT
= 2.0f;
   public static final String  DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
   public static final int     DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT =
5*60*1000;
+
+  public static final String  DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec";
+  public static final int     DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT =
5 * 60;
   
   public static final String  DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
   public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index bbb67fc..61f1e7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -191,7 +191,7 @@ class BlockPoolSliceScanner {
         + hours + " hours for block pool " + bpid);
 
     // get the list of blocks and arrange them in random order
-    List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
+    List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
     Collections.shuffle(arr);
     
     long scanTime = -1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 1508be3..ce33d42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -68,6 +68,7 @@ public class DataStorage extends Storage {
   final static String STORAGE_DIR_DETACHED = "detach";
   public final static String STORAGE_DIR_RBW = "rbw";
   public final static String STORAGE_DIR_FINALIZED = "finalized";
+  public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
   public final static String STORAGE_DIR_TMP = "tmp";
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 08395aa..45862ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -250,7 +250,7 @@ public class ReplicaInPipeline extends ReplicaInfo
         }
       }
     } else {
-      // for create, we can use the requested checksum
+			// for create, we can use the requested checksum
       checksum = requestedChecksum;
     }
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index 235cd7b..d0d36ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -138,8 +138,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
       if (mostAvailableAmongLowVolumes < replicaSize ||
           random.nextFloat() < scaledPreferencePercent) {
         volume = roundRobinPolicyHighAvailable.chooseVolume(
-            highAvailableVolumes,
-            replicaSize);
+            highAvailableVolumes, replicaSize);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from high available space volumes for write of block size "
@@ -147,8 +146,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
         }
       } else {
         volume = roundRobinPolicyLowAvailable.chooseVolume(
-            lowAvailableVolumes,
-            replicaSize);
+            lowAvailableVolumes, replicaSize);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from low available space volumes for write of block size "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4a5580c..2bb2e7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -116,6 +116,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean
{
   /** @return a list of finalized blocks for the given block pool. */
   public List<FinalizedReplica> getFinalizedBlocks(String bpid);
 
+  /** @return a list of finalized blocks for the given block pool. */
+  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
+
   /**
    * Check whether the in-memory block record matches the block on the disk,
    * and, in case that they are not matched, update the record or mark it

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 7839bec..179f3ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.util.Scanner;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
@@ -61,6 +62,7 @@ class BlockPoolSlice {
   private final File currentDir; // StorageDirectory/current/bpid/current
   // directory where finalized replicas are stored
   private final File finalizedDir;
+  private final File lazypersistDir;
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
   private static final String DU_CACHE_FILE = "dfsUsed";
@@ -85,12 +87,24 @@ class BlockPoolSlice {
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
     this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+    this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
     if (!this.finalizedDir.exists()) {
       if (!this.finalizedDir.mkdirs()) {
         throw new IOException("Failed to mkdirs " + this.finalizedDir);
       }
     }
 
+    // Delete all checkpointed replicas on startup.
+    // TODO: We can move checkpointed replicas to the finalized dir and delete
+    //       the copy on RAM_DISK. For now we take the simpler approach.
+
+    FileUtil.fullyDelete(lazypersistDir);
+    if (!this.lazypersistDir.exists()) {
+      if (!this.lazypersistDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + this.lazypersistDir);
+      }
+    }
+
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
     // in the future, we might want to do some sort of datanode-local
@@ -142,6 +156,10 @@ class BlockPoolSlice {
     return finalizedDir;
   }
   
+  File getLazypersistDir() {
+    return lazypersistDir;
+  }
+
   File getRbwDir() {
     return rbwDir;
   }
@@ -258,12 +276,37 @@ class BlockPoolSlice {
     dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     return blockFile;
   }
-    
+
+  File lazyPersistReplica(Block b, File f) throws IOException {
+    File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir);
+    File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
+    dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length());
+    return blockFile;
+  }
+
+  /**
+   * Move a persisted replica from lazypersist directory to a subdirectory
+   * under finalized.
+   */
+  File activateSavedReplica(Block b, File blockFile) throws IOException {
+    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+    final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
+    final File targetBlockFile = new File(blockDir, blockFile.getName());
+    final File targetMetaFile = new File(blockDir, metaFile.getName());
+    FileUtils.moveFile(blockFile, targetBlockFile);
+    FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
+    FileUtils.moveFile(metaFile, targetMetaFile);
+    FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
+    return targetBlockFile;
+  }
+
   void checkDirs() throws DiskErrorException {
     DiskChecker.checkDirs(finalizedDir);
     DiskChecker.checkDir(tmpDir);
     DiskChecker.checkDir(rbwDir);
   }
+
+
     
   void getVolumeMap(ReplicaMap volumeMap) throws IOException {
     // add finalized replicas

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 701207d..1abc6f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
 import java.util.concurrent.Executor;
 
 import javax.management.NotCompliantMBeanException;
@@ -95,6 +96,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -118,7 +120,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-
   @Override // FsDatasetSpi
   public List<FsVolumeImpl> getVolumes() {
     return volumes.volumes;
@@ -211,11 +212,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   final FsVolumeList volumes;
   final Map<String, DatanodeStorage> storageMap;
   final FsDatasetAsyncDiskService asyncDiskService;
+  final Daemon lazyWriter;
   final FsDatasetCache cacheManager;
   private final Configuration conf;
   private final int validVolsRequired;
+  private volatile boolean fsRunning;
 
   final ReplicaMap volumeMap;
+  final LazyWriteReplicaTracker lazyWriteReplicaTracker;
+
+  private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
+
 
   // Used for synchronizing access to usage stats
   private final Object statsLock = new Object();
@@ -225,6 +232,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
       ) throws IOException {
+    this.fsRunning = true;
     this.datanode = datanode;
     this.dataStorage = storage;
     this.conf = conf;
@@ -255,6 +263,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
     volumeMap = new ReplicaMap(this);
+    lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
+
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
         ReflectionUtils.newInstance(conf.getClass(
@@ -264,11 +274,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     volumes = new FsVolumeList(volsFailed, blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode);
 
+    // TODO: Initialize transientReplicaTracker from blocks on disk.
+
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       addVolume(dataLocations, storage.getStorageDir(idx));
     }
 
     cacheManager = new FsDatasetCache(this);
+    lazyWriter = new Daemon(new LazyWriter(
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+                    DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
+    lazyWriter.start();
     registerMBean(datanode.getDatanodeUuid());
   }
 
@@ -664,8 +680,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
   }
 
-  static File moveBlockFiles(Block b, File srcfile, File destdir
-      ) throws IOException {
+  static File moveBlockFiles(Block b, File srcfile, File destdir)
+      throws IOException {
     final File dstfile = new File(destdir, b.getBlockName());
     final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
     final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
@@ -688,6 +704,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return dstfile;
   }
 
+  static File copyBlockFiles(Block b, File srcfile, File destdir)
+      throws IOException {
+    final File dstfile = new File(destdir, b.getBlockName());
+    final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
+    final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
+    try {
+      FileUtils.copyFile(srcmeta, dstmeta);
+    } catch (IOException e) {
+      throw new IOException("Failed to copy meta file for " + b
+          + " from " + srcmeta + " to " + dstmeta, e);
+    }
+    try {
+      FileUtils.copyFile(srcfile, dstfile);
+    } catch (IOException e) {
+      throw new IOException("Failed to copy block file for " + b
+          + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
+          + " and " + srcfile + " to " + dstfile);
+    }
+    return dstfile;
+  }
+
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
@@ -950,6 +990,83 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  /**
+   * Attempt to evict one or more transient block replicas we have at least
+   * spaceNeeded bytes free.
+   *
+   * @return true if we were able to free up at least spaceNeeded bytes, false
+   *          otherwise.
+   */
+  private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
+      throws IOException {
+
+    boolean isAvailable = false;
+
+    LOG.info("Attempting to evict blocks from transient storage");
+
+    // Reverse the map so we can iterate in order of replica creation times,
+    // evicting oldest replicas one at a time until we have sufficient space.
+    TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
+        lazyWriteReplicaTracker.getLruMap();
+    int blocksEvicted = 0;
+
+    // TODO: It is really inefficient to do this with the Object lock held!
+    // TODO: This logic is here just for prototyping.
+    // TODO: We should replace it with proactive discard when ram_disk free space
+    // TODO:   falls below a low watermark. That way we avoid fs operations on the
+    // TODO:   hot path with the lock held.
+    synchronized (this) {
+      long currentTime = System.currentTimeMillis() / 1000;
+      for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries())
{
+        LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
+        LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
+                     "; block LMT=" + entry.getKey() +
+                     "; currentTime=" + currentTime);
+        ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
+        Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+        File blockFile = replicaInfo.getBlockFile();
+        File metaFile = replicaInfo.getMetaFile();
+        long used = blockFile.length() + metaFile.length();
+        lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
+
+        // Move the persisted replica to the finalized directory of
+        // the target volume.
+        BlockPoolSlice bpSlice =
+            lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
+        File newBlockFile = bpSlice.activateSavedReplica(
+            replicaInfo, lazyWriteReplica.savedBlockFile);
+
+        ReplicaInfo newReplicaInfo =
+            new FinalizedReplica(replicaInfo.getBlockId(),
+                                 replicaInfo.getBytesOnDisk(),
+                                 replicaInfo.getGenerationStamp(),
+                                 lazyWriteReplica.lazyPersistVolume,
+                                 newBlockFile.getParentFile());
+
+        // Update the volumeMap entry. This removes the old entry.
+        volumeMap.add(bpid, newReplicaInfo);
+
+        // Remove the old replicas.
+        blockFile.delete();
+        metaFile.delete();
+        ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
+        ++blocksEvicted;
+
+        if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
+          LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
+          isAvailable = true;
+          break;
+        }
+
+        if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
+          break;
+        }
+      }
+    }
+
+    return isAvailable;
+  }
+
   @Override // FsDatasetSpi
   public synchronized ReplicaInPipeline createRbw(StorageType storageType,
       ExtendedBlock b, boolean allowLazyPersist) throws IOException {
@@ -972,7 +1089,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         }
       } catch (DiskOutOfSpaceException de) {
         if (allowLazyPersist) {
-          allowLazyPersist = false;
+          if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
+            // Eviction did not work, we'll just fallback to DEFAULT storage.
+            LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
+                         " bytes for new block. Will fallback to DEFAULT " +
+                         "storage");
+            allowLazyPersist = false;
+          }
           continue;
         }
         throw de;
@@ -984,6 +1107,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+
     return newReplicaInfo;
   }
   
@@ -1129,7 +1253,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), 0);
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-    
     return newReplicaInfo;
   }
 
@@ -1196,8 +1319,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       File dest = v.addFinalizedBlock(
           bpid, replicaInfo, f, replicaInfo.getBytesReserved());
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+
+      if (v.isTransientStorage()) {
+        lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+
+        // Schedule a checkpoint.
+        ((LazyWriter) lazyWriter.getRunnable())
+            .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
+      }
     }
     volumeMap.add(bpid, newReplicaInfo);
+
     return newReplicaInfo;
   }
 
@@ -1217,6 +1349,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           replicaInfo.getMetaFile(), b.getLocalBlock())) {
         LOG.warn("Block " + b + " unfinalized and removed. " );
       }
+      if (replicaInfo.getVolume().isTransientStorage()) {
+        lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+      }
     }
   }
 
@@ -1313,6 +1448,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
+   * Get the list of finalized blocks from in-memory blockmap for a block pool.
+   */
+  @Override
+  public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String
bpid) {
+    ArrayList<FinalizedReplica> finalized =
+        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+      if(!b.getVolume().isTransientStorage() &&
+         b.getState() == ReplicaState.FINALIZED) {
+        finalized.add(new FinalizedReplica((FinalizedReplica)b));
+      }
+    }
+    return finalized;
+  }
+
+  /**
    * Check whether the given block is a valid one.
    * valid means finalized
    */
@@ -1429,6 +1580,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         volumeMap.remove(bpid, invalidBlks[i]);
       }
 
+      if (v.isTransientStorage()) {
+        lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
+      }
+
       // If a DFSClient has the replica in its cache of short-circuit file
       // descriptors (and the client is using ShortCircuitShm), invalidate it.
       datanode.getShortCircuitRegistry().processBlockInvalidation(
@@ -1649,8 +1804,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public void shutdown() {
-    if (mbeanName != null)
+    fsRunning = false;
+
+    ((LazyWriter) lazyWriter.getRunnable()).stop();
+    lazyWriter.interrupt();
+
+    if (mbeanName != null) {
       MBeans.unregister(mbeanName);
+    }
     
     if (asyncDiskService != null) {
       asyncDiskService.shutdown();
@@ -1659,6 +1820,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if(volumes != null) {
       volumes.shutdown();
     }
+
+    try {
+      lazyWriter.join();
+    } catch (InterruptedException ie) {
+      LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
+               "from LazyWriter.join");
+    }
   }
 
   @Override // FSDatasetMBean
@@ -1691,7 +1859,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) {
+      File diskMetaFile, FsVolumeSpi vol) throws IOException {
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     synchronized (this) {
@@ -1724,6 +1892,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           if (blockScanner != null) {
             blockScanner.deleteBlock(bpid, new Block(blockId));
           }
+          if (vol.isTransientStorage()) {
+            lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
+          }
           LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
@@ -1747,6 +1918,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         if (blockScanner != null) {
           blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
         }
+        if (vol.isTransientStorage()) {
+          lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
+        }
         LOG.warn("Added missing block to memory " + diskBlockInfo);
         return;
       }
@@ -1924,9 +2098,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final String bpid = oldBlock.getBlockPoolId();
     final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
     LOG.info("updateReplica: " + oldBlock
-        + ", recoveryId=" + recoveryId
-        + ", length=" + newlength
-        + ", replica=" + replica);
+                 + ", recoveryId=" + recoveryId
+                 + ", length=" + newlength
+                 + ", replica=" + replica);
 
     //check replica
     if (replica == null) {
@@ -2196,5 +2370,123 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
         nbytes, flags);
   }
+
+  private static class BlockIdPair {
+    final String bpid;
+    final long blockId;
+
+    BlockIdPair(final String bpid, final long blockId) {
+      this.bpid = bpid;
+      this.blockId = blockId;
+    }
+  }
+
+  private class LazyWriter implements Runnable {
+    private volatile boolean shouldRun = true;
+    final int checkpointerInterval;
+
+    final private Queue<BlockIdPair> blocksPendingCheckpoint;
+
+    public LazyWriter(final int checkpointerInterval) {
+      this.checkpointerInterval = checkpointerInterval;
+      blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
+    }
+
+    // Schedule a replica for writing to persistent storage.
+    public synchronized void addReplicaToLazyWriteQueue(
+        String bpid, long blockId) {
+      LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer
queue");
+      blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
+    }
+
+    private void moveReplicaToNewVolume(String bpid, long blockId)
+        throws IOException {
+
+      LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
+
+      FsVolumeImpl targetVolume = null;
+      Block block = null;
+      File blockFile = null;
+
+      synchronized (this) {
+        block = getStoredBlock(bpid, blockId);
+        blockFile = getFile(bpid, blockId);
+
+        if (block == null) {
+          // The block was deleted before it could be checkpointed.
+          return;
+        }
+
+        // Pick a target volume for the block.
+        targetVolume = volumes.getNextVolume(
+            StorageType.DEFAULT, block.getNumBytes());
+      }
+
+      LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
+      lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+      File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
+                                        .lazyPersistReplica(block, blockFile);
+      lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
+      LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+          " to file " + savedBlockFile);
+    }
+
+    /**
+     * Checkpoint a pending replica to persistent storage now.
+     * @return true if there is more work to be done, false otherwise.
+     */
+    private boolean saveNextReplica() {
+      BlockIdPair blockIdPair = null;
+      int moreWorkThreshold = 0;
+
+      try {
+        synchronized (this) {
+          // Dequeue the next replica waiting to be checkpointed.
+          blockIdPair = blocksPendingCheckpoint.poll();
+          if (blockIdPair == null) {
+            LOG.info("LazyWriter has no blocks to persist. " +
+                "Thread going to sleep.");
+            return false;
+          }
+        }
+
+        // Move the replica outside the lock.
+        moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId);
+
+      } catch(IOException ioe) {
+        // If we failed, put the block on the queue and let a retry
+        // interval elapse before we try again so we don't try to keep
+        // checkpointing the same block in a tight loop.
+        synchronized (this) {
+          blocksPendingCheckpoint.add(blockIdPair);
+          ++moreWorkThreshold;
+        }
+      }
+
+      synchronized (this) {
+        return blocksPendingCheckpoint.size() > moreWorkThreshold;
+      }
+    }
+
+    @Override
+    public void run() {
+      while (fsRunning && shouldRun) {
+        try {
+          if (!saveNextReplica()) {
+            Thread.sleep(checkpointerInterval * 1000);
+          }
+        } catch (InterruptedException e) {
+          LOG.info("LazyWriter was interrupted, exiting");
+          break;
+        } catch (Exception e) {
+          LOG.error("Ignoring exception in LazyWriter:", e);
+        }
+      }
+    }
+
+    public void stop() {
+      shouldRun = false;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 4c0b5f8..994344e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -380,6 +380,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
     File finalizedDir = new File(bpCurrentDir,
         DataStorage.STORAGE_DIR_FINALIZED);
+    File lazypersistDir = new File(bpCurrentDir,
+        DataStorage.STORAGE_DIR_LAZY_PERSIST);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
     if (force) {
       FileUtil.fullyDelete(bpDir);
@@ -391,6 +393,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
           !FileUtil.fullyDelete(finalizedDir)) {
         throw new IOException("Failed to delete " + finalizedDir);
       }
+      if (!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
+          !FileUtil.fullyDelete(lazypersistDir)) {
+        throw new IOException("Failed to delete " + lazypersistDir);
+      }
       FileUtil.fullyDelete(tmpDir);
       for (File f : FileUtil.listFiles(bpCurrentDir)) {
         if (!f.delete()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
new file mode 100644
index 0000000..ae28f09
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.TreeMultimap;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+class LazyWriteReplicaTracker {
+
+  enum State {
+    IN_MEMORY,
+    LAZY_PERSIST_IN_PROGRESS,
+    LAZY_PERSIST_COMPLETE,
+  }
+
+  static class ReplicaState implements Comparable<ReplicaState> {
+
+    final String bpid;
+    final long blockId;
+    State state;
+
+    /**
+     * transient storage volume that holds the original replica.
+     */
+    final FsVolumeImpl transientVolume;
+
+    /**
+     * Persistent volume that holds or will hold the saved replica.
+     */
+    FsVolumeImpl lazyPersistVolume;
+    File savedBlockFile;
+
+    ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) {
+      this.bpid = bpid;
+      this.blockId = blockId;
+      this.transientVolume = transientVolume;
+      state = State.IN_MEMORY;
+      lazyPersistVolume = null;
+      savedBlockFile = null;
+    }
+
+    @Override
+    public int hashCode() {
+      return bpid.hashCode() ^ (int) blockId;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      ReplicaState otherState = (ReplicaState) other;
+      return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
+    }
+
+    @Override
+    public int compareTo(ReplicaState other) {
+      if (blockId == other.blockId) {
+        return 0;
+      } else if (blockId < other.blockId) {
+        return -1;
+      } else {
+        return 1;
+      }
+    }
+  }
+
+  final FsDatasetImpl fsDataset;
+
+  /**
+   * Map of blockpool ID to map of blockID to ReplicaInfo.
+   */
+  final Map<String, Map<Long, ReplicaState>> replicaMaps;
+
+  /**
+   * A map of blockId to persist complete time for transient blocks. This allows
+   * us to evict LRU blocks from transient storage. Protected by 'this'
+   * Object lock.
+   */
+  final Map<ReplicaState, Long> persistTimeMap;
+
+  LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
+    this.fsDataset = fsDataset;
+    replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
+    persistTimeMap = new HashMap<ReplicaState, Long>();
+  }
+
+  TreeMultimap<Long, ReplicaState> getLruMap() {
+    // TODO: This can be made more efficient.
+    TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
+    for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) {
+      reversedMap.put(entry.getValue(), entry.getKey());
+    }
+    return reversedMap;
+  }
+
+  synchronized void addReplica(String bpid, long blockId,
+                               final FsVolumeImpl transientVolume) {
+    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+    if (map == null) {
+      map = new HashMap<Long, ReplicaState>();
+      replicaMaps.put(bpid, map);
+    }
+    map.put(blockId, new ReplicaState(bpid, blockId, transientVolume));
+  }
+
+  synchronized void recordStartLazyPersist(
+      final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
+    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+    ReplicaState replicaState = map.get(blockId);
+    replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
+    replicaState.lazyPersistVolume = checkpointVolume;
+  }
+
+  synchronized void recordEndLazyPersist(
+      final String bpid, final long blockId, File savedBlockFile) {
+    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+    ReplicaState replicaState = map.get(blockId);
+
+    if (replicaState == null) {
+      throw new IllegalStateException("Unknown replica bpid=" +
+          bpid + "; blockId=" + blockId);
+    }
+    replicaState.state = State.LAZY_PERSIST_COMPLETE;
+    replicaState.savedBlockFile = savedBlockFile;
+    persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000);
+  }
+
+  synchronized void discardReplica(
+      final String bpid, final long blockId, boolean force) {
+    Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+    ReplicaState replicaState = map.get(blockId);
+
+    if (replicaState == null) {
+      if (force) {
+        return;
+      }
+      throw new IllegalStateException("Unknown replica bpid=" +
+          bpid + "; blockId=" + blockId);
+    }
+
+    if (replicaState.state != State.LAZY_PERSIST_COMPLETE && !force) {
+      throw new IllegalStateException("Discarding replica without " +
+          "saving it to disk bpid=" + bpid + "; blockId=" + blockId);
+
+    }
+
+    map.remove(blockId);
+    persistTimeMap.remove(replicaState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index eab599d..312da1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1122,6 +1122,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi>
{
   }
 
   @Override
+  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid)
{
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public Map<String, Object> getVolumeInfoMap() {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e01a89b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
index cf37a54..6098ebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;


Mime
View raw message