hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [29/34] git commit: HDFS-7112. LazyWriter should use either async IO or one thread per physical disk. Contributed by Xiaoyu Yao.
Date Fri, 17 Oct 2014 21:45:16 GMT
HDFS-7112. LazyWriter should use either async IO or one thread per physical disk. Contributed
by Xiaoyu Yao.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69aacf19
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69aacf19
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69aacf19

Branch: refs/heads/branch-2
Commit: 69aacf19c1549d2a7bdb44f85a83feba76824fe8
Parents: ae8c9cd
Author: cnauroth <cnauroth@apache.org>
Authored: Tue Oct 7 20:25:19 2014 -0700
Committer: Jitendra Pandey <Jitendra@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:03 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  95 +++++++
 .../server/datanode/fsdataset/FsDatasetSpi.java |  13 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  21 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 179 ++++++++-----
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  15 +-
 .../impl/RamDiskAsyncLazyPersistService.java    | 252 +++++++++++++++++++
 .../server/datanode/SimulatedFSDataset.java     |  12 +
 .../fsdataset/impl/TestLazyPersistFiles.java    |   4 +-
 8 files changed, 506 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 30b0e66..27e76ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -619,6 +619,101 @@ Release 2.6.0 - UNRELEASED
     HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support
     FSIMAGE_NAME_OPTIMIZATION.  (szetszwo)
 
+    BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
+  
+      HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
+  
+      HDFS-6924. Add new RAM_DISK storage type. (Arpit Agarwal)
+  
+      HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
+      edit logs. (Arpit Agarwal)
+    
+      HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
+      (Arpit Agarwal)
+  
+      HDFS-6925. DataNode should attempt to place replicas on transient storage
+      first if lazyPersist flag is received. (Arpit Agarwal)
+  
+      HDFS-6926. DN support for saving replicas to persistent storage and
+      evicting in-memory replicas. (Arpit Agarwal)
+  
+      HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
+  
+      HDFS-6929. NN periodically unlinks lazy persist files with missing
+      replicas from namespace. (Arpit Agarwal)
+  
+      HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
+      (Arpit Agarwal)
+  
+      HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
+      (Arpit Agarwal)
+  
+      HDFS-6931. Move lazily persisted replicas to finalized directory on DN
+      startup. (Arpit Agarwal)
+  
+      HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
+      Arpit Agarwal)
+  
+      HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
+  
+      HDFS-6977. Delete all copies when a block is deleted from the block space.
+      (Arpit Agarwal)
+  
+      HDFS-6991. Notify NN of evicted block before deleting it from RAM disk.
+      (Arpit Agarwal)
+  
+      HDFS-6978. Directory scanner should correctly reconcile blocks on RAM
+      disk. (Arpit Agarwal)
+  
+      HDFS-7066. LazyWriter#evictBlocks misses a null check for replicaState.
+      (Xiaoyu Yao via Arpit Agarwal)
+  
+      HDFS-7064. Fix unit test failures in HDFS-6581 branch. (Xiaoyu Yao via
+      Arpit Agarwal)
+  
+      HDFS-6581. Few more unit test fixes for HDFS-6581. (Arpit Agarwal)
+  
+      HDFS-7080. Fix finalize and upgrade unit test failures. (Arpit Agarwal)
+  
+      HDFS-7084. FsDatasetImpl#copyBlockFiles debug log can be improved.
+      (Xiaoyu Yao via Arpit Agarwal)
+  
+      HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
+      (Arpit Agarwal)
+  
+      HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
+  
+      HDFS-7108. Fix unit test failures in SimulatedFsDataset. (Arpit Agarwal)
+
+      HDFS-6990. Add unit test for evict/delete RAM_DISK block with open
+      handle. (Xiaoyu Yao via Arpit Agarwal)
+  
+      HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
+      Arpit Agarwal)
+  
+      HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK.
+      (Xiaoyu Yao via Arpit Agarwal)
+  
+      HDFS-7144. Fix findbugs warnings in RamDiskReplicaTracker. (szetszwo via
+      Arpit Agarwal)
+  
+      HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
+      (Arpit Agarwal)
+  
+      HDFS-7153. Add storagePolicy to NN edit log during file creation.
+      (Arpit Agarwal)
+  
+      HDFS-7159. Use block storage policy to set lazy persist preference.
+      (Arpit Agarwal)
+  
+      HDFS-7129. Metrics to track usage of memory for writes. (Xiaoyu Yao
+      via Arpit Agarwal)
+  
+      HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal)
+  
+      HDFS-7112. LazyWriter should use either async IO or one thread per physical
+      disk. (Xiaoyu Yao via cnauroth)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 2bb2e7f..3f1400d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -463,5 +464,15 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean
{
   public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
       final FileDescriptor fd, final long offset, final long nbytes,
       final int flags);
-}
 
+  /**
+   * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task end
+   */
+   public void onCompleteLazyPersist(String bpId, long blockId,
+      long creationTime, File[] savedFiles, FsVolumeImpl targetVolume);
+
+   /**
+    * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
+    */
+   public void onFailLazyPersist(String bpId, long blockId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 3f58d38..dce2ff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -171,6 +171,10 @@ class BlockPoolSlice {
   long getDfsUsed() throws IOException {
     return dfsUsage.getUsed();
   }
+
+  void incDfsUsed(long value) {
+    dfsUsage.incDfsUsed(value);
+  }
   
    /**
    * Read in the cached DU value and return it if it is less than 600 seconds
@@ -277,23 +281,6 @@ class BlockPoolSlice {
   }
 
   /**
-   * Save the given replica to persistent storage.
-   *
-   * @return The saved meta and block files, in that order.
-   * @throws IOException
-   */
-  File[] lazyPersistReplica(long blockId, long genStamp,
-                            File srcMeta, File srcFile) throws IOException {
-    if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
-      FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
-    }
-    File targetFiles[] = FsDatasetImpl.copyBlockFiles(
-        blockId, genStamp, srcMeta, srcFile, lazypersistDir);
-    dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
-    return targetFiles;
-  }
-
-  /**
    * Move a persisted replica from lazypersist directory to a subdirectory
    * under finalized.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f2daf99..07e19cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -220,6 +220,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   final ReplicaMap volumeMap;
   final RamDiskReplicaTracker ramDiskReplicaTracker;
+  final RamDiskAsyncLazyPersistService asyncLazyPersistService;
 
   private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
 
@@ -273,10 +274,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             VolumeChoosingPolicy.class), conf);
     volumes = new FsVolumeList(volsFailed, blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode);
+    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
 
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       addVolume(dataLocations, storage.getStorageDir(idx));
     }
+    setupAsyncLazyPersistThreads();
 
     cacheManager = new FsDatasetCache(this);
 
@@ -409,6 +412,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
+    setupAsyncLazyPersistThreads();
+
     for (int i = 0; i < volumes.size(); i++) {
       if (successFlags[i]) {
         succeedVolumes.add(volumes.get(i));
@@ -462,6 +467,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         storageMap.remove(sd.getStorageUuid());
       }
     }
+    setupAsyncLazyPersistThreads();
   }
 
   private StorageType getStorageTypeFromLocations(
@@ -1506,10 +1512,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         RamDiskReplica replicaInfo =
           ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
         if (replicaInfo != null) {
-          if (replicaInfo.getIsPersisted() ==  false) {
+          if (!replicaInfo.getIsPersisted()) {
             datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
           }
-          discardRamDiskReplica(replicaInfo, true);
+          ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
+            replicaInfo.getBlockId(), true);
         }
       }
 
@@ -1750,6 +1757,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (asyncDiskService != null) {
       asyncDiskService.shutdown();
     }
+
+    if (asyncLazyPersistService != null) {
+      asyncLazyPersistService.shutdown();
+    }
     
     if(volumes != null) {
       volumes.shutdown();
@@ -2309,6 +2320,40 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override
+  public void onCompleteLazyPersist(String bpId, long blockId,
+      long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
+    synchronized (FsDatasetImpl.this) {
+      ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
+
+      targetVolume.incDfsUsed(bpId,
+          savedFiles[0].length() + savedFiles[1].length());
+
+      // Update metrics (ignore the metadata file size)
+      datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
+      datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length());
+      datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
+          Time.monotonicNow() - creationTime);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("LazyWriter: Finish persisting RamDisk block: "
+            + " block pool Id: " + bpId + " block id: " + blockId
+            + " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
+            + " on target volume " + targetVolume);
+      }
+    }
+  }
+
+  @Override
+  public void onFailLazyPersist(String bpId, long blockId) {
+    RamDiskReplica block = null;
+    block = ramDiskReplicaTracker.getReplica(bpId, blockId);
+    if (block != null) {
+      LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
+      ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
+    }
+  }
+
+  @Override
   public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
       FileDescriptor fd, long offset, long nbytes, int flags) {
     FsVolumeImpl fsVolumeImpl = this.getVolume(block);
@@ -2316,9 +2361,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         nbytes, flags);
   }
 
-  void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
-    ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
-      replica.getBlockId(), deleteSavedCopies);
+  private boolean ramDiskConfigured() {
+    for (FsVolumeImpl v: getVolumes()){
+      if (v.isTransientStorage()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // Add/Remove per DISK volume async lazy persist thread when RamDisk volume is
+  // added or removed.
+  // This should only be called when the FsDataSetImpl#volumes list is finalized.
+  private void setupAsyncLazyPersistThreads() {
+    boolean ramDiskConfigured = ramDiskConfigured();
+    for (FsVolumeImpl v: getVolumes()){
+      // Skip transient volumes
+      if (v.isTransientStorage()) {
+        continue;
+      }
+
+      // Add thread for DISK volume if RamDisk is configured
+      if (ramDiskConfigured &&
+          !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+        asyncLazyPersistService.addVolume(v.getCurrentDir());
+      }
+
+      // Remove thread for DISK volume if RamDisk is not configured
+      if (!ramDiskConfigured &&
+          asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+        asyncLazyPersistService.removeVolume(v.getCurrentDir());
+      }
+    }
   }
 
   class LazyWriter implements Runnable {
@@ -2344,61 +2418,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
     }
 
-    private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
-        throws IOException {
-
-      FsVolumeImpl targetVolume;
-      ReplicaInfo replicaInfo;
-      BlockPoolSlice bpSlice;
-      File srcFile, srcMeta;
-      long genStamp;
-
-      synchronized (FsDatasetImpl.this) {
-        replicaInfo = volumeMap.get(bpid, blockId);
-
-        if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
-          // The block was either deleted before it could be checkpointed or
-          // it is already on persistent storage. This can occur if a second
-          // replica on persistent storage was found after the lazy write was
-          // scheduled.
-          return;
-        }
-
-        // Pick a target volume for the block.
-        targetVolume = volumes.getNextVolume(
-            StorageType.DEFAULT, replicaInfo.getNumBytes());
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
-        }
-
-        ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
-        bpSlice = targetVolume.getBlockPoolSlice(bpid);
-        srcMeta = replicaInfo.getMetaFile();
-        srcFile = replicaInfo.getBlockFile();
-        genStamp = replicaInfo.getGenerationStamp();
-      }
-
-      // Drop the FsDatasetImpl lock for the file copy.
-      File[] savedFiles =
-          bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
-
-      synchronized (FsDatasetImpl.this) {
-        ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
-
-        // Update metrics (ignore the metadata file size)
-        datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
-        datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
-        datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
-          Time.monotonicNow() - creationTime);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
-                        " to file " + savedFiles[1]);
-        }
-      }
-    }
-
     /**
      * Checkpoint a pending replica to persistent storage now.
      * If we fail then move the replica to the end of the queue.
@@ -2406,13 +2425,43 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
      */
     private boolean saveNextReplica() {
       RamDiskReplica block = null;
+      FsVolumeImpl targetVolume;
+      ReplicaInfo replicaInfo;
       boolean succeeded = false;
 
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
-            block.getCreationTime());
+          synchronized (FsDatasetImpl.this) {
+            replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
+
+            // If replicaInfo is null, the block was either deleted before
+            // it could be checkpointed or it is already on persistent storage.
+            // This can occur if a second replica on persistent storage was found
+            // after the lazy write was scheduled.
+            if (replicaInfo != null &&
+                replicaInfo.getVolume().isTransientStorage()) {
+              // Pick a target volume to persist the block.
+              targetVolume = volumes.getNextVolume(
+                  StorageType.DEFAULT, replicaInfo.getNumBytes());
+
+              ramDiskReplicaTracker.recordStartLazyPersist(
+                  block.getBlockPoolId(), block.getBlockId(), targetVolume);
+
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("LazyWriter: Start persisting RamDisk block:"
+                    + " block pool Id: " + block.getBlockPoolId()
+                    + " block id: " + block.getBlockId()
+                    + " on target volume " + targetVolume);
+              }
+
+              asyncLazyPersistService.submitLazyPersistTask(
+                  block.getBlockPoolId(), block.getBlockId(),
+                  replicaInfo.getGenerationStamp(), block.getCreationTime(),
+                  replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
+                  targetVolume);
+            }
+          }
         }
         succeeded = true;
       } catch(IOException ioe) {
@@ -2420,10 +2469,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       } finally {
         if (!succeeded && block != null) {
           LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
-          ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
+          onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
         }
       }
-
       return succeeded;
     }
 
@@ -2480,7 +2528,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           metaFile = replicaInfo.getMetaFile();
           blockFileUsed = blockFile.length();
           metaFileUsed = metaFile.length();
-          discardRamDiskReplica(replicaState, false);
+          ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
+              replicaState.getBlockId(), false);
 
           // Move the replica from lazyPersist/ to finalized/ on target volume
           BlockPoolSlice bpSlice =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 60ea125..32709be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -124,7 +124,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
   File getRbwDir(String bpid) throws IOException {
     return getBlockPoolSlice(bpid).getRbwDir();
   }
-  
+
+  File getLazyPersistDir(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getLazypersistDir();
+  }
+
   void decDfsUsed(String bpid, long value) {
     synchronized(dataset) {
       BlockPoolSlice bp = bpSlices.get(bpid);
@@ -134,6 +138,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
+  void incDfsUsed(String bpid, long value) {
+    synchronized(dataset) {
+      BlockPoolSlice bp = bpSlices.get(bpid);
+      if (bp != null) {
+        bp.incDfsUsed(value);
+      }
+    }
+  }
+
   long getDfsUsed() throws IOException {
     long dfsUsed = 0;
     synchronized(dataset) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
new file mode 100644
index 0000000..76acbea
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is a container of multiple thread pools, one for each non-RamDisk
+ * volume with a maximum thread count of 1 so that we can schedule async lazy
+ * persist operations easily with volume arrival and departure handled.
+ *
+ * This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
+ * They should be combined.
+ */
+class RamDiskAsyncLazyPersistService {
+  public static final Log LOG = LogFactory.getLog(RamDiskAsyncLazyPersistService.class);
+
+  // ThreadPool core pool size
+  private static final int CORE_THREADS_PER_VOLUME = 1;
+  // ThreadPool maximum pool size
+  private static final int MAXIMUM_THREADS_PER_VOLUME = 1;
+  // ThreadPool keep-alive time for threads over core pool size
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
+  private final DataNode datanode;
+  private final ThreadGroup threadGroup;
+  private Map<File, ThreadPoolExecutor> executors
+      = new HashMap<File, ThreadPoolExecutor>();
+
+  /**
+   * Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their
+   * root directories).
+   *
+   * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
+   * disk operations.
+   */
+  RamDiskAsyncLazyPersistService(DataNode datanode) {
+    this.datanode = datanode;
+    this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+  }
+
+  private void addExecutorForVolume(final File volume) {
+    ThreadFactory threadFactory = new ThreadFactory() {
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(threadGroup, r);
+        t.setName("Async RamDisk lazy persist worker for volume " + volume);
+        return t;
+      }
+    };
+
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(
+        CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+        THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+    // This can reduce the number of running threads
+    executor.allowCoreThreadTimeOut(true);
+    executors.put(volume, executor);
+  }
+
+  /**
+   * Starts AsyncLazyPersistService for a new volume
+   * @param volume the root of the new data volume.
+   */
+  synchronized void addVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+    }
+    ThreadPoolExecutor executor = executors.get(volume);
+    if (executor != null) {
+      throw new RuntimeException("Volume " + volume + " is already existed.");
+    }
+    addExecutorForVolume(volume);
+  }
+
+  /**
+   * Stops AsyncLazyPersistService for a volume.
+   * @param volume the root of the volume.
+   */
+  synchronized void removeVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncDiskService is already shutdown");
+    }
+    ThreadPoolExecutor executor = executors.get(volume);
+    if (executor == null) {
+      throw new RuntimeException("Can not find volume " + volume
+        + " to remove.");
+    } else {
+      executor.shutdown();
+      executors.remove(volume);
+    }
+  }
+
+  /**
+   * Query if the thread pool exist for the volume
+   * @param volume the root of a volume
+   * @return true if there is one thread pool for the volume
+   *         false otherwise
+   */
+  synchronized boolean queryVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+    }
+    ThreadPoolExecutor executor = executors.get(volume);
+    return (executor != null);
+  }
+
+  /**
+   * Execute the task sometime in the future, using ThreadPools.
+   */
+  synchronized void execute(File root, Runnable task) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+    }
+    ThreadPoolExecutor executor = executors.get(root);
+    if (executor == null) {
+      throw new RuntimeException("Cannot find root " + root
+          + " for execution of task " + task);
+    } else {
+      executor.execute(task);
+    }
+  }
+
+  /**
+   * Gracefully shut down all ThreadPool. Will wait for all lazy persist
+   * tasks to finish.
+   */
+  synchronized void shutdown() {
+    if (executors == null) {
+      LOG.warn("AsyncLazyPersistService has already shut down.");
+    } else {
+      LOG.info("Shutting down all async lazy persist service threads");
+
+      for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
+        e.getValue().shutdown();
+      }
+      // clear the executor map so that calling execute again will fail.
+      executors = null;
+      LOG.info("All async lazy persist service threads have been shut down");
+    }
+  }
+
+  /**
+   * Asynchronously lazy persist the block from the RamDisk to Disk.
+   */
+  void submitLazyPersistTask(String bpId, long blockId,
+      long genStamp, long creationTime,
+      File metaFile, File blockFile,
+      FsVolumeImpl targetVolume) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+          + bpId + " block id: " + blockId);
+    }
+
+    File lazyPersistDir  = targetVolume.getLazyPersistDir(bpId);
+    if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
+      FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
+      throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+          + lazyPersistDir.toString());
+    }
+
+    ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
+        bpId, blockId, genStamp, creationTime, blockFile, metaFile,
+        targetVolume, lazyPersistDir);
+    execute(targetVolume.getCurrentDir(), lazyPersistTask);
+  }
+
+  class ReplicaLazyPersistTask implements Runnable {
+    final String bpId;
+    final long blockId;
+    final long genStamp;
+    final long creationTime;
+    final File blockFile;
+    final File metaFile;
+    final FsVolumeImpl targetVolume;
+    final File lazyPersistDir;
+
+    ReplicaLazyPersistTask(String bpId, long blockId,
+        long genStamp, long creationTime,
+        File blockFile, File metaFile,
+        FsVolumeImpl targetVolume, File lazyPersistDir) {
+      this.bpId = bpId;
+      this.blockId = blockId;
+      this.genStamp = genStamp;
+      this.creationTime = creationTime;
+      this.blockFile = blockFile;
+      this.metaFile = metaFile;
+      this.targetVolume = targetVolume;
+      this.lazyPersistDir = lazyPersistDir;
+    }
+
+    @Override
+    public String toString() {
+      // Called in AsyncLazyPersistService.execute for displaying error messages.
+      return "LazyWriter async task of persist RamDisk block pool id:"
+          + bpId + " block pool id: "
+          + blockId + " with block file " + blockFile
+          + " and meta file " + metaFile + " to target volume " + targetVolume;}
+
+    @Override
+    public void run() {
+      boolean succeeded = false;
+      try {
+        // No FsDatasetImpl lock for the file copy
+        File targetFiles[] = FsDatasetImpl.copyBlockFiles(
+            blockId, genStamp, metaFile, blockFile, lazyPersistDir);
+
+        // Lock FsDataSetImpl during onCompleteLazyPersist callback
+        datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,
+            creationTime, targetFiles, targetVolume);
+        succeeded = true;
+      } catch (Exception e){
+        FsDatasetImpl.LOG.warn(
+            "LazyWriter failed to async persist RamDisk block pool id: "
+            + bpId + "block Id: " + blockId);
+      } finally {
+        if (!succeeded) {
+          datanode.getFSDataset().onFailLazyPersist(bpId, blockId);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index d1284fe..0786bc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -1209,5 +1210,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi>
{
       FileDescriptor fd, long offset, long nbytes, int flags) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void onCompleteLazyPersist(String bpId, long blockId,
+      long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void onFailLazyPersist(String bpId, long blockId) {
+    throw new UnsupportedOperationException();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aacf19/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 91deb55..9f1d50a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -971,7 +971,9 @@ public class TestLazyPersistFiles {
 
   void printRamDiskJMXMetrics() {
     try {
-      jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+      if (jmx != null) {
+        jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+      }
     } catch (Exception e) {
       e.printStackTrace();
     }


Mime
View raw message