hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject [2/2] hadoop git commit: HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei)
Date Mon, 10 Oct 2016 22:30:10 GMT
HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96b12662
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96b12662
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96b12662

Branch: refs/heads/trunk
Commit: 96b12662ea76e3ded4ef13944fc8df206cfb4613
Parents: 0773ffd
Author: Lei Xu <lei@apache.org>
Authored: Mon Oct 10 15:28:19 2016 -0700
Committer: Lei Xu <lei@apache.org>
Committed: Mon Oct 10 15:30:03 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/common/Storage.java      |  22 ++
 .../server/datanode/BlockPoolSliceStorage.java  |  20 +-
 .../hdfs/server/datanode/BlockScanner.java      |   8 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  34 +-
 .../hdfs/server/datanode/DataStorage.java       |  34 +-
 .../hdfs/server/datanode/DirectoryScanner.java  | 320 +------------------
 .../hdfs/server/datanode/DiskBalancer.java      |  25 +-
 .../hdfs/server/datanode/LocalReplica.java      |   2 +-
 .../hdfs/server/datanode/ReplicaInfo.java       |   2 +-
 .../hdfs/server/datanode/StorageLocation.java   |  32 +-
 .../hdfs/server/datanode/VolumeScanner.java     |  27 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   5 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  | 234 +++++++++++++-
 .../impl/FsDatasetAsyncDiskService.java         |  40 ++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 136 ++++----
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 233 ++++++++++++--
 .../fsdataset/impl/FsVolumeImplBuilder.java     |  65 ++++
 .../datanode/fsdataset/impl/FsVolumeList.java   |  44 +--
 .../impl/RamDiskAsyncLazyPersistService.java    |  79 +++--
 .../fsdataset/impl/VolumeFailureInfo.java       |  13 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   2 +-
 .../TestNameNodePrunesMissingStorages.java      |  15 +-
 .../server/datanode/SimulatedFSDataset.java     |  46 ++-
 .../hdfs/server/datanode/TestBlockScanner.java  |   3 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    |  15 +-
 .../datanode/TestDataNodeVolumeFailure.java     |  12 +-
 .../TestDataNodeVolumeFailureReporting.java     |  10 +
 .../server/datanode/TestDirectoryScanner.java   |  76 +++--
 .../hdfs/server/datanode/TestDiskError.java     |   2 +-
 .../extdataset/ExternalDatasetImpl.java         |  10 +-
 .../datanode/extdataset/ExternalVolumeImpl.java |  44 ++-
 .../fsdataset/impl/FsDatasetImplTestUtils.java  |   9 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  69 ++--
 .../fsdataset/impl/TestFsVolumeList.java        |  83 +++--
 .../TestDiskBalancerWithMockMover.java          |   4 +-
 35 files changed, 1062 insertions(+), 713 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 9218e9d..e55de35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.apache.hadoop.util.ToolRunner;
@@ -269,11 +270,17 @@ public abstract class Storage extends StorageInfo {
 
     private String storageUuid = null;      // Storage directory identifier.
     
+    private final StorageLocation location;
     public StorageDirectory(File dir) {
       // default dirType is null
       this(dir, null, false);
     }
     
+    public StorageDirectory(StorageLocation location) {
+      // default dirType is null
+      this(location.getFile(), null, false, location);
+    }
+
     public StorageDirectory(File dir, StorageDirType dirType) {
       this(dir, dirType, false);
     }
@@ -294,11 +301,22 @@ public abstract class Storage extends StorageInfo {
      *          disables locking on the storage directory, false enables locking
      */
     public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
+      this(dir, dirType, isShared, null);
+    }
+
+    public StorageDirectory(File dir, StorageDirType dirType,
+        boolean isShared, StorageLocation location) {
       this.root = dir;
       this.lock = null;
       this.dirType = dirType;
       this.isShared = isShared;
+      this.location = location;
+      assert location == null ||
+          dir.getAbsolutePath().startsWith(
+              location.getFile().getAbsolutePath()):
+            "The storage location and directory should be equal";
     }
+
     
     /**
      * Get root directory of this storage
@@ -861,6 +879,10 @@ public abstract class Storage extends StorageInfo {
       }
       return false;
     }
+
+    public StorageLocation getStorageLocation() {
+      return location;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index fd89611..e3b6da1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -147,10 +147,10 @@ public class BlockPoolSliceStorage extends Storage {
    * @throws IOException
    */
   private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
-      File dataDir, StartupOption startOpt,
+      File dataDir, StorageLocation location, StartupOption startOpt,
       List<Callable<StorageDirectory>> callables, Configuration conf)
           throws IOException {
-    StorageDirectory sd = new StorageDirectory(dataDir, null, true);
+    StorageDirectory sd = new StorageDirectory(dataDir, null, true, location);
     try {
       StorageState curState = sd.analyzeStorage(startOpt, this, true);
       // sd is locked but not opened
@@ -208,9 +208,9 @@ public class BlockPoolSliceStorage extends Storage {
    * @throws IOException on error
    */
   List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
-      Collection<File> dataDirs, StartupOption startOpt,
-      List<Callable<StorageDirectory>> callables, Configuration conf)
-          throws IOException {
+      Collection<File> dataDirs, StorageLocation location,
+      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+      Configuration conf) throws IOException {
     List<StorageDirectory> succeedDirs = Lists.newArrayList();
     try {
       for (File dataDir : dataDirs) {
@@ -220,7 +220,7 @@ public class BlockPoolSliceStorage extends Storage {
                   "attempt to load an used block storage: " + dataDir);
         }
         final StorageDirectory sd = loadStorageDirectory(
-            nsInfo, dataDir, startOpt, callables, conf);
+            nsInfo, dataDir, location, startOpt, callables, conf);
         succeedDirs.add(sd);
       }
     } catch (IOException e) {
@@ -244,12 +244,12 @@ public class BlockPoolSliceStorage extends Storage {
    * @throws IOException on error
    */
   List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
-      Collection<File> dataDirs, StartupOption startOpt,
-      List<Callable<StorageDirectory>> callables, Configuration conf)
-          throws IOException {
+      Collection<File> dataDirs, StorageLocation location,
+      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+      Configuration conf) throws IOException {
     LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
     final List<StorageDirectory> loaded = loadBpStorageDirectories(
-        nsInfo, dataDirs, startOpt, callables, conf);
+        nsInfo, dataDirs, location, startOpt, callables, conf);
     for (StorageDirectory sd : loaded) {
       addStorageDir(sd);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 456dcc1..21484fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -201,17 +201,17 @@ public class BlockScanner {
       FsVolumeSpi volume = ref.getVolume();
       if (!isEnabled()) {
         LOG.debug("Not adding volume scanner for {}, because the block " +
-            "scanner is disabled.", volume.getBasePath());
+            "scanner is disabled.", volume);
         return;
       }
       VolumeScanner scanner = scanners.get(volume.getStorageID());
       if (scanner != null) {
         LOG.error("Already have a scanner for volume {}.",
-            volume.getBasePath());
+            volume);
         return;
       }
       LOG.debug("Adding scanner for volume {} (StorageID {})",
-          volume.getBasePath(), volume.getStorageID());
+          volume, volume.getStorageID());
       scanner = new VolumeScanner(conf, datanode, ref);
       scanner.start();
       scanners.put(volume.getStorageID(), scanner);
@@ -245,7 +245,7 @@ public class BlockScanner {
       return;
     }
     LOG.info("Removing scanner for volume {} (StorageID {})",
-        volume.getBasePath(), volume.getStorageID());
+        volume, volume.getStorageID());
     scanner.shutdown();
     scanners.remove(volume.getStorageID());
     Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index dd7e426..cb8e308 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -58,7 +58,6 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -78,7 +77,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -791,11 +789,7 @@ public class DataNode extends ReconfigurableBase
     if (locations.isEmpty()) {
       return;
     }
-    Set<File> volumesToRemove = new HashSet<>();
-    for (StorageLocation loc : locations) {
-      volumesToRemove.add(loc.getFile().getAbsoluteFile());
-    }
-    removeVolumes(volumesToRemove, true);
+    removeVolumes(locations, true);
   }
 
   /**
@@ -814,26 +808,22 @@ public class DataNode extends ReconfigurableBase
    * @throws IOException
    */
   private synchronized void removeVolumes(
-      final Set<File> absoluteVolumePaths, boolean clearFailure)
+      final Collection<StorageLocation> storageLocations, boolean clearFailure)
       throws IOException {
-    for (File vol : absoluteVolumePaths) {
-      Preconditions.checkArgument(vol.isAbsolute());
-    }
-
-    if (absoluteVolumePaths.isEmpty()) {
+    if (storageLocations.isEmpty()) {
       return;
     }
 
     LOG.info(String.format("Deactivating volumes (clear failure=%b): %s",
-        clearFailure, Joiner.on(",").join(absoluteVolumePaths)));
+        clearFailure, Joiner.on(",").join(storageLocations)));
 
     IOException ioe = null;
     // Remove volumes and block infos from FsDataset.
-    data.removeVolumes(absoluteVolumePaths, clearFailure);
+    data.removeVolumes(storageLocations, clearFailure);
 
     // Remove volumes from DataStorage.
     try {
-      storage.removeVolumes(absoluteVolumePaths);
+      storage.removeVolumes(storageLocations);
     } catch (IOException e) {
       ioe = e;
     }
@@ -841,7 +831,7 @@ public class DataNode extends ReconfigurableBase
     // Set configuration and dataDirs to reflect volume changes.
     for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) {
       StorageLocation loc = it.next();
-      if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) {
+      if (storageLocations.contains(loc)) {
         it.remove();
       }
     }
@@ -3242,18 +3232,18 @@ public class DataNode extends ReconfigurableBase
    * Check the disk error
    */
   private void checkDiskError() {
-    Set<File> unhealthyDataDirs = data.checkDataDir();
-    if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
+    Set<StorageLocation> unhealthyLocations = data.checkDataDir();
+    if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) {
       try {
         // Remove all unhealthy volumes from DataNode.
-        removeVolumes(unhealthyDataDirs, false);
+        removeVolumes(unhealthyLocations, false);
       } catch (IOException e) {
         LOG.warn("Error occurred when removing unhealthy storage dirs: "
             + e.getMessage(), e);
       }
       StringBuilder sb = new StringBuilder("DataNode failed volumes:");
-      for (File dataDir : unhealthyDataDirs) {
-        sb.append(dataDir.getAbsolutePath() + ";");
+      for (StorageLocation location : unhealthyLocations) {
+        sb.append(location + ";");
       }
       handleDiskError(sb.toString());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 7e620c2..7c9bea5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -263,9 +263,10 @@ public class DataStorage extends Storage {
   }
 
   private StorageDirectory loadStorageDirectory(DataNode datanode,
-      NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
-      List<Callable<StorageDirectory>> callables) throws IOException {
-    StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+      NamespaceInfo nsInfo, File dataDir, StorageLocation location,
+      StartupOption startOpt, List<Callable<StorageDirectory>> callables)
+          throws IOException {
+    StorageDirectory sd = new StorageDirectory(dataDir, null, false, location);
     try {
       StorageState curState = sd.analyzeStorage(startOpt, this, true);
       // sd is locked but not opened
@@ -310,7 +311,7 @@ public class DataStorage extends Storage {
    * builder later.
    *
    * @param datanode DataNode object.
-   * @param volume the root path of a storage directory.
+   * @param location the StorageLocation for the storage directory.
    * @param nsInfos an array of namespace infos.
    * @return a VolumeBuilder that holds the metadata of this storage directory
    * and can be added to DataStorage later.
@@ -318,8 +319,10 @@ public class DataStorage extends Storage {
    *
    * Note that if there is IOException, the state of DataStorage is not modified.
    */
-  public VolumeBuilder prepareVolume(DataNode datanode, File volume,
-      List<NamespaceInfo> nsInfos) throws IOException {
+  public VolumeBuilder prepareVolume(DataNode datanode,
+      StorageLocation location, List<NamespaceInfo> nsInfos)
+          throws IOException {
+    File volume = location.getFile();
     if (containsStorageDir(volume)) {
       final String errorMessage = "Storage directory is in use";
       LOG.warn(errorMessage + ".");
@@ -327,7 +330,8 @@ public class DataStorage extends Storage {
     }
 
     StorageDirectory sd = loadStorageDirectory(
-        datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
+        datanode, nsInfos.get(0), volume, location,
+        StartupOption.HOTSWAP, null);
     VolumeBuilder builder =
         new VolumeBuilder(this, sd);
     for (NamespaceInfo nsInfo : nsInfos) {
@@ -338,7 +342,8 @@ public class DataStorage extends Storage {
 
       final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
       final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
-          nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
+          nsInfo, bpDataDirs, location, StartupOption.HOTSWAP,
+          null, datanode.getConf());
       builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
     }
     return builder;
@@ -407,7 +412,7 @@ public class DataStorage extends Storage {
           final List<Callable<StorageDirectory>> callables
               = Lists.newArrayList();
           final StorageDirectory sd = loadStorageDirectory(
-              datanode, nsInfo, root, startOpt, callables);
+              datanode, nsInfo, root, dataDir, startOpt, callables);
           if (callables.isEmpty()) {
             addStorageDir(sd);
             success.add(dataDir);
@@ -458,7 +463,8 @@ public class DataStorage extends Storage {
 
         final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
         final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
-            nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
+            nsInfo, bpDataDirs, dataDir, startOpt,
+            callables, datanode.getConf());
         if (callables.isEmpty()) {
           for(StorageDirectory sd : dirs) {
             success.add(sd);
@@ -498,9 +504,10 @@ public class DataStorage extends Storage {
    * @param dirsToRemove a set of storage directories to be removed.
    * @throws IOException if I/O error when unlocking storage directory.
    */
-  synchronized void removeVolumes(final Set<File> dirsToRemove)
+  synchronized void removeVolumes(
+      final Collection<StorageLocation> storageLocations)
       throws IOException {
-    if (dirsToRemove.isEmpty()) {
+    if (storageLocations.isEmpty()) {
       return;
     }
 
@@ -508,7 +515,8 @@ public class DataStorage extends Storage {
     for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
          it.hasNext(); ) {
       StorageDirectory sd = it.next();
-      if (dirsToRemove.contains(sd.getRoot())) {
+      StorageLocation sdLocation = sd.getStorageLocation();
+      if (storageLocations.contains(sdLocation)) {
         // Remove the block pool level storage first.
         for (Map.Entry<String, BlockPoolSliceStorage> entry :
             this.bpStorageMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index c50bfaf..58071dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -37,9 +36,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -47,10 +43,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
 import org.apache.hadoop.util.Time;
@@ -209,200 +204,6 @@ public class DirectoryScanner implements Runnable {
     }
   }
 
-  /**
-   * Tracks the files and other information related to a block on the disk
-   * Missing file is indicated by setting the corresponding member
-   * to null.
-   * 
-   * Because millions of these structures may be created, we try to save
-   * memory here.  So instead of storing full paths, we store path suffixes.
-   * The block file, if it exists, will have a path like this:
-   * <volume_base_path>/<block_path>
-   * So we don't need to store the volume path, since we already know what the
-   * volume is.
-   * 
-   * The metadata file, if it exists, will have a path like this:
-   * <volume_base_path>/<block_path>_<genstamp>.meta
-   * So if we have a block file, there isn't any need to store the block path
-   * again.
-   * 
-   * The accessor functions take care of these manipulations.
-   */
-  static class ScanInfo implements Comparable<ScanInfo> {
-    private final long blockId;
-    
-    /**
-     * The block file path, relative to the volume's base directory.
-     * If there was no block file found, this may be null. If 'vol'
-     * is null, then this is the full path of the block file.
-     */
-    private final String blockSuffix;
-    
-    /**
-     * The suffix of the meta file path relative to the block file.
-     * If blockSuffix is null, then this will be the entire path relative
-     * to the volume base directory, or an absolute path if vol is also
-     * null.
-     */
-    private final String metaSuffix;
-
-    private final FsVolumeSpi volume;
-
-    /**
-     * Get the file's length in async block scan
-     */
-    private final long blockFileLength;
-
-    private final static Pattern CONDENSED_PATH_REGEX =
-        Pattern.compile("(?<!^)(\\\\|/){2,}");
-    
-    private final static String QUOTED_FILE_SEPARATOR = 
-        Matcher.quoteReplacement(File.separator);
-    
-    /**
-     * Get the most condensed version of the path.
-     *
-     * For example, the condensed version of /foo//bar is /foo/bar
-     * Unlike {@link File#getCanonicalPath()}, this will never perform I/O
-     * on the filesystem.
-     *
-     * @param path the path to condense
-     * @return the condensed path
-     */
-    private static String getCondensedPath(String path) {
-      return CONDENSED_PATH_REGEX.matcher(path).
-          replaceAll(QUOTED_FILE_SEPARATOR);
-    }
-
-    /**
-     * Get a path suffix.
-     *
-     * @param f            The file to get the suffix for.
-     * @param prefix       The prefix we're stripping off.
-     *
-     * @return             A suffix such that prefix + suffix = path to f
-     */
-    private static String getSuffix(File f, String prefix) {
-      String fullPath = getCondensedPath(f.getAbsolutePath());
-      if (fullPath.startsWith(prefix)) {
-        return fullPath.substring(prefix.length());
-      }
-      throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
-    }
-
-    /**
-     * Create a ScanInfo object for a block. This constructor will examine
-     * the block data and meta-data files.
-     *
-     * @param blockId the block ID
-     * @param blockFile the path to the block data file
-     * @param metaFile the path to the block meta-data file
-     * @param vol the volume that contains the block
-     */
-    ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
-      this.blockId = blockId;
-      String condensedVolPath = vol == null ? null :
-        getCondensedPath(vol.getBasePath());
-      this.blockSuffix = blockFile == null ? null :
-        getSuffix(blockFile, condensedVolPath);
-      this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; 
-      if (metaFile == null) {
-        this.metaSuffix = null;
-      } else if (blockFile == null) {
-        this.metaSuffix = getSuffix(metaFile, condensedVolPath);
-      } else {
-        this.metaSuffix = getSuffix(metaFile,
-            condensedVolPath + blockSuffix);
-      }
-      this.volume = vol;
-    }
-
-    /**
-     * Returns the block data file.
-     *
-     * @return the block data file
-     */
-    File getBlockFile() {
-      return (blockSuffix == null) ? null :
-        new File(volume.getBasePath(), blockSuffix);
-    }
-
-    /**
-     * Return the length of the data block. The length returned is the length
-     * cached when this object was created.
-     *
-     * @return the length of the data block
-     */
-    long getBlockFileLength() {
-      return blockFileLength;
-    }
-
-    /**
-     * Returns the block meta data file or null if there isn't one.
-     *
-     * @return the block meta data file
-     */
-    File getMetaFile() {
-      if (metaSuffix == null) {
-        return null;
-      } else if (blockSuffix == null) {
-        return new File(volume.getBasePath(), metaSuffix);
-      } else {
-        return new File(volume.getBasePath(), blockSuffix + metaSuffix);
-      }
-    }
-
-    /**
-     * Returns the block ID.
-     *
-     * @return the block ID
-     */
-    long getBlockId() {
-      return blockId;
-    }
-
-    /**
-     * Returns the volume that contains the block that this object describes.
-     *
-     * @return the volume
-     */
-    FsVolumeSpi getVolume() {
-      return volume;
-    }
-
-    @Override // Comparable
-    public int compareTo(ScanInfo b) {
-      if (blockId < b.blockId) {
-        return -1;
-      } else if (blockId == b.blockId) {
-        return 0;
-      } else {
-        return 1;
-      }
-    }
-
-    @Override // Object
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (!(o instanceof ScanInfo)) {
-        return false;
-      }
-      return blockId == ((ScanInfo) o).blockId;
-    }
-
-    @Override // Object
-    public int hashCode() {
-      return (int)(blockId^(blockId>>>32));
-    }
-
-    public long getGenStamp() {
-      return metaSuffix != null ? Block.getGenerationStamp(
-          getMetaFile().getName()) : 
-            HdfsConstants.GRANDFATHER_GENERATION_STAMP;
-    }
-  }
 
   /**
    * Create a new directory scanner, but don't cycle it running yet.
@@ -644,7 +445,7 @@ public class DirectoryScanner implements Runnable {
             // There may be multiple on-disk records for the same block, don't increment
             // the memory record pointer if so.
             ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
-            if (nextInfo.getBlockId() != info.blockId) {
+            if (nextInfo.getBlockId() != info.getBlockId()) {
               ++m;
             }
           } else {
@@ -763,19 +564,6 @@ public class DirectoryScanner implements Runnable {
   }
 
   /**
-   * Helper method to determine if a file name is consistent with a block.
-   * meta-data file
-   *
-   * @param blockId the block ID
-   * @param metaFile the file to check
-   * @return whether the file name is a block meta-data file name
-   */
-  private static boolean isBlockMetaFile(String blockId, String metaFile) {
-    return metaFile.startsWith(blockId)
-        && metaFile.endsWith(Block.METADATA_EXTENSION);
-  }
-
-  /**
    * The ReportCompiler class encapsulates the process of searching a datanode's
    * disks for block information.  It operates by performing a DFS of the
    * volume to discover block information.
@@ -784,7 +572,7 @@ public class DirectoryScanner implements Runnable {
    * ScanInfo object for it and adds that object to its report list.  The report
    * list is returned by the {@link #call()} method.
    */
-  private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
+  public class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
     private final FsVolumeSpi volume;
     private final DataNode datanode;
     // Variable for tracking time spent running for throttling purposes
@@ -816,14 +604,12 @@ public class DirectoryScanner implements Runnable {
       ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
       for (String bpid : bpList) {
         LinkedList<ScanInfo> report = new LinkedList<>();
-        File bpFinalizedDir = volume.getFinalizedDir(bpid);
 
         perfTimer.start();
         throttleTimer.start();
 
         try {
-          result.put(bpid,
-              compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
+          result.put(bpid, volume.compileReport(bpid, report, this));
         } catch (InterruptedException ex) {
           // Exit quickly and flag the scanner to do the same
           result = null;
@@ -834,106 +620,12 @@ public class DirectoryScanner implements Runnable {
     }
 
     /**
-     * Compile a list of {@link ScanInfo} for the blocks in the directory
-     * given by {@code dir}.
-     *
-     * @param vol the volume that contains the directory to scan
-     * @param bpFinalizedDir the root directory of the directory to scan
-     * @param dir the directory to scan
-     * @param report the list onto which blocks reports are placed
-     */
-    private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
-        File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
-        throws InterruptedException {
-
-      throttle();
-
-      List <String> fileNames;
-      try {
-        fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
-      } catch (IOException ioe) {
-        LOG.warn("Exception occured while compiling report: ", ioe);
-        // Initiate a check on disk failure.
-        datanode.checkDiskErrorAsync();
-        // Ignore this directory and proceed.
-        return report;
-      }
-      Collections.sort(fileNames);
-
-      /*
-       * Assumption: In the sorted list of files block file appears immediately
-       * before block metadata file. This is true for the current naming
-       * convention for block file blk_<blockid> and meta file
-       * blk_<blockid>_<genstamp>.meta
-       */
-      for (int i = 0; i < fileNames.size(); i++) {
-        // Make sure this thread can make a timely exit. With a low throttle
-        // rate, completing a run can take a looooong time.
-        if (Thread.interrupted()) {
-          throw new InterruptedException();
-        }
-
-        File file = new File(dir, fileNames.get(i));
-        if (file.isDirectory()) {
-          compileReport(vol, bpFinalizedDir, file, report);
-          continue;
-        }
-        if (!Block.isBlockFilename(file)) {
-          if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
-            long blockId = Block.getBlockId(file.getName());
-            verifyFileLocation(file.getParentFile(), bpFinalizedDir,
-                blockId);
-            report.add(new ScanInfo(blockId, null, file, vol));
-          }
-          continue;
-        }
-        File blockFile = file;
-        long blockId = Block.filename2id(file.getName());
-        File metaFile = null;
-
-        // Skip all the files that start with block name until
-        // getting to the metafile for the block
-        while (i + 1 < fileNames.size()) {
-          File blkMetaFile = new File(dir, fileNames.get(i + 1));
-          if (!(blkMetaFile.isFile()
-              && blkMetaFile.getName().startsWith(blockFile.getName()))) {
-            break;
-          }
-          i++;
-          if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
-            metaFile = blkMetaFile;
-            break;
-          }
-        }
-        verifyFileLocation(blockFile, bpFinalizedDir, blockId);
-        report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
-      }
-      return report;
-    }
-
-    /**
-     * Verify whether the actual directory location of block file has the
-     * expected directory path computed using its block ID.
-     */
-    private void verifyFileLocation(File actualBlockFile,
-        File bpFinalizedDir, long blockId) {
-      File expectedBlockDir =
-          DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
-      File actualBlockDir = actualBlockFile.getParentFile();
-      if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
-        LOG.warn("Block: " + blockId +
-            " found in invalid directory.  Expected directory: " +
-            expectedBlockDir + ".  Actual directory: " + actualBlockDir);
-      }
-    }
-
-    /**
      * Called by the thread before each potential disk scan so that a pause
      * can be optionally inserted to limit the number of scans per second.
      * The limit is controlled by
      * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
      */
-    private void throttle() throws InterruptedException {
+    public void throttle() throws InterruptedException {
       accumulateTimeRunning();
 
       if ((throttleLimitMsPerSec < 1000) &&
@@ -963,7 +655,7 @@ public class DirectoryScanner implements Runnable {
     }
   }
 
-  private enum BlockDirFilter implements FilenameFilter {
+  public enum BlockDirFilter implements FilenameFilter {
     INSTANCE;
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index e7e9105..0c75001 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -500,7 +500,8 @@ public class DiskBalancer {
         references = this.dataset.getFsVolumeReferences();
         for (int ndx = 0; ndx < references.size(); ndx++) {
           FsVolumeSpi vol = references.get(ndx);
-          storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath());
+          storageIDToVolBasePathMap.put(vol.getStorageID(),
+              vol.getBaseURI().getPath());
         }
         references.close();
       }
@@ -1023,7 +1024,7 @@ public class DiskBalancer {
         openPoolIters(source, poolIters);
         if (poolIters.size() == 0) {
           LOG.error("No block pools found on volume. volume : {}. Exiting.",
-              source.getBasePath());
+              source.getBaseURI());
           return;
         }
 
@@ -1033,17 +1034,16 @@ public class DiskBalancer {
             // Check for the max error count constraint.
             if (item.getErrorCount() > getMaxError(item)) {
               LOG.error("Exceeded the max error count. source {}, dest: {} " +
-                      "error count: {}", source.getBasePath(),
-                  dest.getBasePath(), item.getErrorCount());
-              this.setExitFlag();
-              continue;
+                      "error count: {}", source.getBaseURI(),
+                  dest.getBaseURI(), item.getErrorCount());
+              break;
             }
 
             // Check for the block tolerance constraint.
             if (isCloseEnough(item)) {
               LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
                       "blocks.",
-                  source.getBasePath(), dest.getBasePath(),
+                  source.getBaseURI(), dest.getBaseURI(),
                   item.getBytesCopied(), item.getBlocksCopied());
               this.setExitFlag();
               continue;
@@ -1053,7 +1053,7 @@ public class DiskBalancer {
             // we are not able to find any blocks to copy.
             if (block == null) {
               LOG.error("No source blocks, exiting the copy. Source: {}, " +
-                  "Dest:{}", source.getBasePath(), dest.getBasePath());
+                  "Dest:{}", source.getBaseURI(), dest.getBaseURI());
               this.setExitFlag();
               continue;
             }
@@ -1081,14 +1081,13 @@ public class DiskBalancer {
               // exiting here.
               LOG.error("Destination volume: {} does not have enough space to" +
                   " accommodate a block. Block Size: {} Exiting from" +
-                  " copyBlocks.", dest.getBasePath(), block.getNumBytes());
-              this.setExitFlag();
-              continue;
+                  " copyBlocks.", dest.getBaseURI(), block.getNumBytes());
+              break;
             }
 
             LOG.debug("Moved block with size {} from  {} to {}",
-                block.getNumBytes(), source.getBasePath(),
-                dest.getBasePath());
+                block.getNumBytes(), source.getBaseURI(),
+                dest.getBaseURI());
 
             // Check for the max throughput constraint.
             // We sleep here to keep the promise that we will not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index cbfc9a5..58febf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
 import org.apache.hadoop.io.IOUtils;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index cbbafc3..dc63238 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -25,8 +25,8 @@ import java.net.URI;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.util.LightWeightResizableGSet;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
index 3162c5c..75abc1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.StringUtils;
 
+
 /**
  * Encapsulates the URI and storage medium that together describe a
  * storage directory.
@@ -37,7 +38,7 @@ import org.apache.hadoop.util.StringUtils;
  *
  */
 @InterfaceAudience.Private
-public class StorageLocation {
+public class StorageLocation implements Comparable<StorageLocation>{
   final StorageType storageType;
   final File file;
 
@@ -104,16 +105,37 @@ public class StorageLocation {
 
   @Override
   public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    } else if (obj == null || !(obj instanceof StorageLocation)) {
+    if (obj == null || !(obj instanceof StorageLocation)) {
       return false;
     }
-    return toString().equals(obj.toString());
+    int comp = compareTo((StorageLocation) obj);
+    return comp == 0;
   }
 
   @Override
   public int hashCode() {
     return toString().hashCode();
   }
+
+  @Override
+  public int compareTo(StorageLocation obj) {
+    if (obj == this) {
+      return 0;
+    } else if (obj == null) {
+      return -1;
+    }
+
+    StorageLocation otherStorage = (StorageLocation) obj;
+    if (this.getFile() != null && otherStorage.getFile() != null) {
+      return this.getFile().getAbsolutePath().compareTo(
+          otherStorage.getFile().getAbsolutePath());
+    } else if (this.getFile() == null && otherStorage.getFile() == null) {
+      return this.storageType.compareTo(otherStorage.getStorageType());
+    } else if (this.getFile() == null) {
+      return -1;
+    } else {
+      return 1;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 3416b53..1e44fb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -217,7 +217,7 @@ public class VolumeScanner extends Thread {
 
   public void printStats(StringBuilder p) {
     p.append(String.format("Block scanner information for volume %s with base" +
-        " path %s%n", volume.getStorageID(), volume.getBasePath()));
+        " path %s%n", volume.getStorageID(), volume));
     synchronized (stats) {
       p.append(String.format("Bytes verified in last hour       : %57d%n",
           stats.bytesScannedInPastHour));
@@ -253,20 +253,20 @@ public class VolumeScanner extends Thread {
 
     public void setup(VolumeScanner scanner) {
       LOG.trace("Starting VolumeScanner {}",
-          scanner.volume.getBasePath());
+          scanner.volume);
       this.scanner = scanner;
     }
 
     public void handle(ExtendedBlock block, IOException e) {
       FsVolumeSpi volume = scanner.volume;
       if (e == null) {
-        LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
+        LOG.trace("Successfully scanned {} on {}", block, volume);
         return;
       }
       // If the block does not exist anymore, then it's not an error.
       if (!volume.getDataset().contains(block)) {
         LOG.debug("Volume {}: block {} is no longer in the dataset.",
-            volume.getBasePath(), block);
+            volume, block);
         return;
       }
       // If the block exists, the exception may due to a race with write:
@@ -278,11 +278,10 @@ public class VolumeScanner extends Thread {
       if (e instanceof FileNotFoundException ) {
         LOG.info("Volume {}: verification failed for {} because of " +
                 "FileNotFoundException.  This may be due to a race with write.",
-            volume.getBasePath(), block);
+            volume, block);
         return;
       }
-      LOG.warn("Reporting bad " + block + " with volume "
-          + volume.getBasePath(), e);
+      LOG.warn("Reporting bad {} on {}", block, volume);
       try {
         scanner.datanode.reportBadBlocks(block, volume);
       } catch (IOException ie) {
@@ -305,7 +304,7 @@ public class VolumeScanner extends Thread {
       handler = new ScanResultHandler();
     }
     this.resultHandler = handler;
-    setName("VolumeScannerThread(" + volume.getBasePath() + ")");
+    setName("VolumeScannerThread(" + volume + ")");
     setDaemon(true);
   }
 
@@ -376,7 +375,7 @@ public class VolumeScanner extends Thread {
       BlockIterator iter = blockIters.get(idx);
       if (!iter.atEnd()) {
         LOG.info("Now scanning bpid {} on volume {}",
-            iter.getBlockPoolId(), volume.getBasePath());
+            iter.getBlockPoolId(), volume);
         curBlockIter = iter;
         return 0L;
       }
@@ -385,7 +384,7 @@ public class VolumeScanner extends Thread {
       if (waitMs <= 0) {
         iter.rewind();
         LOG.info("Now rescanning bpid {} on volume {}, after more than " +
-            "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
+            "{} hour(s)", iter.getBlockPoolId(), volume,
             TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
         curBlockIter = iter;
         return 0L;
@@ -416,16 +415,16 @@ public class VolumeScanner extends Thread {
           cblock.getBlockPoolId(), cblock.getBlockId());
       if (b == null) {
         LOG.info("Replica {} was not found in the VolumeMap for volume {}",
-            cblock, volume.getBasePath());
+            cblock, volume);
       } else {
         block = new ExtendedBlock(cblock.getBlockPoolId(), b);
       }
     } catch (FileNotFoundException e) {
       LOG.info("FileNotFoundException while finding block {} on volume {}",
-          cblock, volume.getBasePath());
+          cblock, volume);
     } catch (IOException e) {
       LOG.warn("I/O error while finding block {} on volume {}",
-            cblock, volume.getBasePath());
+            cblock, volume);
     }
     if (block == null) {
       return -1; // block not found.
@@ -642,7 +641,7 @@ public class VolumeScanner extends Thread {
 
   @Override
   public String toString() {
-    return "VolumeScanner(" + volume.getBasePath() +
+    return "VolumeScanner(" + volume +
         ", " + volume.getStorageID() + ")";
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index b75ed5b..f2ffa83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -206,7 +207,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @param clearFailure set true to clear the failure information about the
    *                     volumes.
    */
-  void removeVolumes(Set<File> volumes, boolean clearFailure);
+  void removeVolumes(Collection<StorageLocation> volumes, boolean clearFailure);
 
   /** @return a storage with the given storage ID */
   DatanodeStorage getStorage(final String storageUuid);
@@ -482,7 +483,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
      * Check if all the data directories are healthy
      * @return A set of unhealthy data directories.
      */
-  Set<File> checkDataDir();
+  Set<StorageLocation> checkDataDir();
 
   /**
    * Shutdown the FSDataset

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 9e16121..dbba31d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -20,10 +20,20 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.channels.ClosedChannelException;
+import java.util.LinkedList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 
 /**
  * This is an interface for the underlying volume.
@@ -48,14 +58,14 @@ public interface FsVolumeSpi {
   long getAvailable() throws IOException;
 
   /** @return the base path to the volume */
-  String getBasePath();
+  URI getBaseURI();
 
-  /** @return the path to the volume */
-  String getPath(String bpid) throws IOException;
+  DF getUsageStats(Configuration conf);
 
-  /** @return the directory for the finalized blocks in the block pool. */
-  File getFinalizedDir(String bpid) throws IOException;
-  
+  /** @return the {@link StorageLocation} to the volume */
+  StorageLocation getStorageLocation();
+
+  /** @return the {@link StorageType} of the volume */
   StorageType getStorageType();
 
   /** Returns true if the volume is NOT backed by persistent storage. */
@@ -186,4 +196,216 @@ public interface FsVolumeSpi {
    * Get the FSDatasetSpi which this volume is a part of.
    */
   FsDatasetSpi getDataset();
+
+  /**
+   * Tracks the files and other information related to a block on the disk
+   * Missing file is indicated by setting the corresponding member
+   * to null.
+   *
+   * Because millions of these structures may be created, we try to save
+   * memory here.  So instead of storing full paths, we store path suffixes.
+   * The block file, if it exists, will have a path like this:
+   * <volume_base_path>/<block_path>
+   * So we don't need to store the volume path, since we already know what the
+   * volume is.
+   *
+   * The metadata file, if it exists, will have a path like this:
+   * <volume_base_path>/<block_path>_<genstamp>.meta
+   * So if we have a block file, there isn't any need to store the block path
+   * again.
+   *
+   * The accessor functions take care of these manipulations.
+   */
+  public static class ScanInfo implements Comparable<ScanInfo> {
+    private final long blockId;
+
+    /**
+     * The block file path, relative to the volume's base directory.
+     * If there was no block file found, this may be null. If 'vol'
+     * is null, then this is the full path of the block file.
+     */
+    private final String blockSuffix;
+
+    /**
+     * The suffix of the meta file path relative to the block file.
+     * If blockSuffix is null, then this will be the entire path relative
+     * to the volume base directory, or an absolute path if vol is also
+     * null.
+     */
+    private final String metaSuffix;
+
+    private final FsVolumeSpi volume;
+
+    /**
+     * Get the file's length in async block scan
+     */
+    private final long blockFileLength;
+
+    private final static Pattern CONDENSED_PATH_REGEX =
+        Pattern.compile("(?<!^)(\\\\|/){2,}");
+
+    private final static String QUOTED_FILE_SEPARATOR =
+        Matcher.quoteReplacement(File.separator);
+
+    /**
+     * Get the most condensed version of the path.
+     *
+     * For example, the condensed version of /foo//bar is /foo/bar
+     * Unlike {@link File#getCanonicalPath()}, this will never perform I/O
+     * on the filesystem.
+     *
+     * @param path the path to condense
+     * @return the condensed path
+     */
+    private static String getCondensedPath(String path) {
+      return CONDENSED_PATH_REGEX.matcher(path).
+          replaceAll(QUOTED_FILE_SEPARATOR);
+    }
+
+    /**
+     * Get a path suffix.
+     *
+     * @param f            The file to get the suffix for.
+     * @param prefix       The prefix we're stripping off.
+     *
+     * @return             A suffix such that prefix + suffix = path to f
+     */
+    private static String getSuffix(File f, String prefix) {
+      String fullPath = getCondensedPath(f.getAbsolutePath());
+      if (fullPath.startsWith(prefix)) {
+        return fullPath.substring(prefix.length());
+      }
+      throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
+    }
+
+    /**
+     * Create a ScanInfo object for a block. This constructor will examine
+     * the block data and meta-data files.
+     *
+     * @param blockId the block ID
+     * @param blockFile the path to the block data file
+     * @param metaFile the path to the block meta-data file
+     * @param vol the volume that contains the block
+     */
+    public ScanInfo(long blockId, File blockFile, File metaFile,
+        FsVolumeSpi vol) {
+      this.blockId = blockId;
+      String condensedVolPath =
+          (vol == null || vol.getBaseURI() == null) ? null :
+            getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
+      this.blockSuffix = blockFile == null ? null :
+        getSuffix(blockFile, condensedVolPath);
+      this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
+      if (metaFile == null) {
+        this.metaSuffix = null;
+      } else if (blockFile == null) {
+        this.metaSuffix = getSuffix(metaFile, condensedVolPath);
+      } else {
+        this.metaSuffix = getSuffix(metaFile,
+            condensedVolPath + blockSuffix);
+      }
+      this.volume = vol;
+    }
+
+    /**
+     * Returns the block data file.
+     *
+     * @return the block data file
+     */
+    public File getBlockFile() {
+      return (blockSuffix == null) ? null :
+        new File(new File(volume.getBaseURI()).getAbsolutePath(), blockSuffix);
+    }
+
+    /**
+     * Return the length of the data block. The length returned is the length
+     * cached when this object was created.
+     *
+     * @return the length of the data block
+     */
+    public long getBlockFileLength() {
+      return blockFileLength;
+    }
+
+    /**
+     * Returns the block meta data file or null if there isn't one.
+     *
+     * @return the block meta data file
+     */
+    public File getMetaFile() {
+      if (metaSuffix == null) {
+        return null;
+      } else if (blockSuffix == null) {
+        return new File(new File(volume.getBaseURI()).getAbsolutePath(),
+            metaSuffix);
+      } else {
+        return new File(new File(volume.getBaseURI()).getAbsolutePath(),
+            blockSuffix + metaSuffix);
+      }
+    }
+
+    /**
+     * Returns the block ID.
+     *
+     * @return the block ID
+     */
+    public long getBlockId() {
+      return blockId;
+    }
+
+    /**
+     * Returns the volume that contains the block that this object describes.
+     *
+     * @return the volume
+     */
+    public FsVolumeSpi getVolume() {
+      return volume;
+    }
+
+    @Override // Comparable
+    public int compareTo(ScanInfo b) {
+      if (blockId < b.blockId) {
+        return -1;
+      } else if (blockId == b.blockId) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+
+    @Override // Object
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof ScanInfo)) {
+        return false;
+      }
+      return blockId == ((ScanInfo) o).blockId;
+    }
+
+    @Override // Object
+    public int hashCode() {
+      return (int)(blockId^(blockId>>>32));
+    }
+
+    public long getGenStamp() {
+      return metaSuffix != null ? Block.getGenerationStamp(
+          getMetaFile().getName()) :
+            HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+    }
+  }
+
+  /**
+   * Compile a list of {@link ScanInfo} for the blocks in
+   * the block pool with id {@code bpid}.
+   *
+   * @param bpid block pool id to scan
+   * @param report the list onto which blocks reports are placed
+   * @param reportCompiler
+   * @throws IOException
+   */
+  LinkedList<ScanInfo> compileReport(String bpid,
+      LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+      throws InterruptedException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index c9160cd..b9c731b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -71,8 +71,8 @@ class FsDatasetAsyncDiskService {
   private final DataNode datanode;
   private final FsDatasetImpl fsdatasetImpl;
   private final ThreadGroup threadGroup;
-  private Map<File, ThreadPoolExecutor> executors
-      = new HashMap<File, ThreadPoolExecutor>();
+  private Map<String, ThreadPoolExecutor> executors
+      = new HashMap<String, ThreadPoolExecutor>();
   private Map<String, Set<Long>> deletedBlockIds 
       = new HashMap<String, Set<Long>>();
   private static final int MAX_DELETED_BLOCKS = 64;
@@ -91,7 +91,7 @@ class FsDatasetAsyncDiskService {
     this.threadGroup = new ThreadGroup(getClass().getSimpleName());
   }
 
-  private void addExecutorForVolume(final File volume) {
+  private void addExecutorForVolume(final FsVolumeImpl volume) {
     ThreadFactory threadFactory = new ThreadFactory() {
       int counter = 0;
 
@@ -115,18 +115,21 @@ class FsDatasetAsyncDiskService {
 
     // This can reduce the number of running threads
     executor.allowCoreThreadTimeOut(true);
-    executors.put(volume, executor);
+    executors.put(volume.getStorageID(), executor);
   }
 
   /**
    * Starts AsyncDiskService for a new volume
    * @param volume the root of the new data volume.
    */
-  synchronized void addVolume(File volume) {
+  synchronized void addVolume(FsVolumeImpl volume) {
     if (executors == null) {
       throw new RuntimeException("AsyncDiskService is already shutdown");
     }
-    ThreadPoolExecutor executor = executors.get(volume);
+    if (volume == null) {
+      throw new RuntimeException("Attempt to add a null volume");
+    }
+    ThreadPoolExecutor executor = executors.get(volume.getStorageID());
     if (executor != null) {
       throw new RuntimeException("Volume " + volume + " is already existed.");
     }
@@ -137,17 +140,17 @@ class FsDatasetAsyncDiskService {
    * Stops AsyncDiskService for a volume.
    * @param volume the root of the volume.
    */
-  synchronized void removeVolume(File volume) {
+  synchronized void removeVolume(String storageId) {
     if (executors == null) {
       throw new RuntimeException("AsyncDiskService is already shutdown");
     }
-    ThreadPoolExecutor executor = executors.get(volume);
+    ThreadPoolExecutor executor = executors.get(storageId);
     if (executor == null) {
-      throw new RuntimeException("Can not find volume " + volume
-          + " to remove.");
+      throw new RuntimeException("Can not find volume with storageId "
+          + storageId + " to remove.");
     } else {
       executor.shutdown();
-      executors.remove(volume);
+      executors.remove(storageId);
     }
   }
   
@@ -162,13 +165,16 @@ class FsDatasetAsyncDiskService {
   /**
    * Execute the task sometime in the future, using ThreadPools.
    */
-  synchronized void execute(File root, Runnable task) {
+  synchronized void execute(FsVolumeImpl volume, Runnable task) {
     if (executors == null) {
       throw new RuntimeException("AsyncDiskService is already shutdown");
     }
-    ThreadPoolExecutor executor = executors.get(root);
+    if (volume == null) {
+      throw new RuntimeException("A null volume does not have a executor");
+    }
+    ThreadPoolExecutor executor = executors.get(volume.getStorageID());
     if (executor == null) {
-      throw new RuntimeException("Cannot find root " + root
+      throw new RuntimeException("Cannot find volume " + volume
           + " for execution of task " + task);
     } else {
       executor.execute(task);
@@ -185,7 +191,7 @@ class FsDatasetAsyncDiskService {
     } else {
       LOG.info("Shutting down all async disk service threads");
       
-      for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
+      for (Map.Entry<String, ThreadPoolExecutor> e : executors.entrySet()) {
         e.getValue().shutdown();
       }
       // clear the executor map so that calling execute again will fail.
@@ -198,7 +204,7 @@ class FsDatasetAsyncDiskService {
   public void submitSyncFileRangeRequest(FsVolumeImpl volume,
       final FileDescriptor fd, final long offset, final long nbytes,
       final int flags) {
-    execute(volume.getCurrentDir(), new Runnable() {
+    execute(volume, new Runnable() {
       @Override
       public void run() {
         try {
@@ -220,7 +226,7 @@ class FsDatasetAsyncDiskService {
         + " replica " + replicaToDelete + " for deletion");
     ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
         volumeRef, replicaToDelete, block, trashDirectory);
-    execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
+    execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
   }
   
   /** A task for deleting a block file and its associated meta file, as well

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 26a2e9f..fd747bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -361,20 +361,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
       Collection<StorageLocation> dataLocations, DataStorage storage) {
-    Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
+    Set<StorageLocation> failedLocationSet = Sets.newHashSetWithExpectedSize(
         dataLocations.size());
     for (StorageLocation sl: dataLocations) {
-      failedLocationSet.add(sl.getFile().getAbsolutePath());
+      LOG.info("Adding to failedLocationSet " + sl);
+      failedLocationSet.add(sl);
     }
     for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
          it.hasNext(); ) {
       Storage.StorageDirectory sd = it.next();
-      failedLocationSet.remove(sd.getRoot().getAbsolutePath());
+      failedLocationSet.remove(sd.getStorageLocation());
+      LOG.info("Removing from failedLocationSet " + sd.getStorageLocation());
     }
     List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
         failedLocationSet.size());
     long failureDate = Time.now();
-    for (String failedStorageLocation: failedLocationSet) {
+    for (StorageLocation failedStorageLocation: failedLocationSet) {
       volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
           failureDate));
     }
@@ -403,49 +405,55 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           new DatanodeStorage(sd.getStorageUuid(),
               DatanodeStorage.State.NORMAL,
               storageType));
-      asyncDiskService.addVolume(sd.getCurrentDir());
+      asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume());
       volumes.addVolume(ref);
     }
   }
 
   private void addVolume(Collection<StorageLocation> dataLocations,
       Storage.StorageDirectory sd) throws IOException {
-    final File dir = sd.getCurrentDir();
-    final StorageType storageType =
-        getStorageTypeFromLocations(dataLocations, sd.getRoot());
+    final StorageLocation storageLocation = sd.getStorageLocation();
 
     // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
     // nothing needed to be rolled back to make various data structures, e.g.,
     // storageMap and asyncDiskService, consistent.
-    FsVolumeImpl fsVolume = new FsVolumeImpl(
-        this, sd.getStorageUuid(), dir, this.conf, storageType);
+    FsVolumeImpl fsVolume = new FsVolumeImplBuilder()
+                              .setDataset(this)
+                              .setStorageID(sd.getStorageUuid())
+                              .setStorageDirectory(sd)
+                              .setConf(this.conf)
+                              .build();
     FsVolumeReference ref = fsVolume.obtainReference();
     ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
-    activateVolume(tempVolumeMap, sd, storageType, ref);
-    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+    activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
+    LOG.info("Added volume - " + storageLocation + ", StorageType: " +
+        storageLocation.getStorageType());
   }
 
   @VisibleForTesting
-  public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
-      StorageType storageType) throws IOException {
-    return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
+  public FsVolumeImpl createFsVolume(String storageUuid,
+      Storage.StorageDirectory sd,
+      final StorageLocation location) throws IOException {
+    return new FsVolumeImplBuilder()
+        .setDataset(this)
+        .setStorageID(storageUuid)
+        .setStorageDirectory(sd)
+        .setConf(conf)
+        .build();
   }
 
   @Override
   public void addVolume(final StorageLocation location,
       final List<NamespaceInfo> nsInfos)
       throws IOException {
-    final File dir = location.getFile();
-
     // Prepare volume in DataStorage
     final DataStorage.VolumeBuilder builder;
     try {
-      builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
+      builder = dataStorage.prepareVolume(datanode, location, nsInfos);
     } catch (IOException e) {
-      volumes.addVolumeFailureInfo(new VolumeFailureInfo(
-          location.getFile().getAbsolutePath(), Time.now()));
+      volumes.addVolumeFailureInfo(new VolumeFailureInfo(location, Time.now()));
       throw e;
     }
 
@@ -453,7 +461,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     StorageType storageType = location.getStorageType();
     final FsVolumeImpl fsVolume =
-        createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
+        createFsVolume(sd.getStorageUuid(), sd, location);
     final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
     ArrayList<IOException> exceptions = Lists.newArrayList();
 
@@ -482,34 +490,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     builder.build();
     activateVolume(tempVolumeMap, sd, storageType, ref);
-    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+    LOG.info("Added volume - " + location + ", StorageType: " + storageType);
   }
 
   /**
    * Removes a set of volumes from FsDataset.
-   * @param volumesToRemove a set of absolute root path of each volume.
+   * @param storageLocationsToRemove a set of
+   * {@link StorageLocation}s for each volume.
    * @param clearFailure set true to clear failure information.
    */
   @Override
-  public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
-    // Make sure that all volumes are absolute path.
-    for (File vol : volumesToRemove) {
-      Preconditions.checkArgument(vol.isAbsolute(),
-          String.format("%s is not absolute path.", vol.getPath()));
-    }
-
+  public void removeVolumes(
+      Collection<StorageLocation> storageLocationsToRemove,
+      boolean clearFailure) {
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
-        final File absRoot = sd.getRoot().getAbsoluteFile();
-        if (volumesToRemove.contains(absRoot)) {
-          LOG.info("Removing " + absRoot + " from FsDataset.");
-
+        final StorageLocation sdLocation = sd.getStorageLocation();
+        LOG.info("Checking removing StorageLocation " +
+            sdLocation + " with id " + sd.getStorageUuid());
+        if (storageLocationsToRemove.contains(sdLocation)) {
+          LOG.info("Removing StorageLocation " + sdLocation + " with id " +
+              sd.getStorageUuid() + " from FsDataset.");
           // Disable the volume from the service.
-          asyncDiskService.removeVolume(sd.getCurrentDir());
-          volumes.removeVolume(absRoot, clearFailure);
+          asyncDiskService.removeVolume(sd.getStorageUuid());
+          volumes.removeVolume(sdLocation, clearFailure);
           volumes.waitVolumeRemoved(5000, datasetLockCondition);
 
           // Removed all replica information for the blocks on the volume.
@@ -517,12 +524,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // not scan disks.
           for (String bpid : volumeMap.getBlockPoolList()) {
             List<ReplicaInfo> blocks = new ArrayList<>();
-            for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
-                 it.hasNext(); ) {
+            for (Iterator<ReplicaInfo> it =
+                  volumeMap.replicas(bpid).iterator(); it.hasNext();) {
               ReplicaInfo block = it.next();
-              final File absBasePath =
-                  new File(block.getVolume().getBasePath()).getAbsoluteFile();
-              if (absBasePath.equals(absRoot)) {
+              final StorageLocation blockStorageLocation =
+                  block.getVolume().getStorageLocation();
+              LOG.info("checking for block " + block.getBlockId() +
+                  " with storageLocation " + blockStorageLocation);
+              if (blockStorageLocation.equals(sdLocation)) {
                 blocks.add(block);
                 it.remove();
               }
@@ -625,7 +634,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
         infos.length);
     for (VolumeFailureInfo info: infos) {
-      failedStorageLocations.add(info.getFailedStorageLocation());
+      failedStorageLocations.add(
+          info.getFailedStorageLocation().getFile().getAbsolutePath());
     }
     return failedStorageLocations.toArray(
         new String[failedStorageLocations.size()]);
@@ -663,7 +673,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long lastVolumeFailureDate = 0;
     long estimatedCapacityLostTotal = 0;
     for (VolumeFailureInfo info: infos) {
-      failedStorageLocations.add(info.getFailedStorageLocation());
+      failedStorageLocations.add(
+          info.getFailedStorageLocation().getFile().getAbsolutePath());
       long failureDate = info.getFailureDate();
       if (failureDate > lastVolumeFailureDate) {
         lastVolumeFailureDate = failureDate;
@@ -960,25 +971,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
     // Copy files to temp dir first
-    File[] blockFiles = copyBlockFiles(block.getBlockId(),
-        block.getGenerationStamp(), replicaInfo,
-        targetVolume.getTmpDir(block.getBlockPoolId()),
-        replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
-
-    ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
-        .setBlockId(replicaInfo.getBlockId())
-        .setGenerationStamp(replicaInfo.getGenerationStamp())
-        .setFsVolume(targetVolume)
-        .setDirectoryToUse(blockFiles[0].getParentFile())
-        .setBytesToReserve(0)
-        .build();
-    newReplicaInfo.setNumBytes(blockFiles[1].length());
+    ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block,
+        replicaInfo, smallBufferSize, conf);
+
     // Finalize the copied files
     newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       // Increment numBlocks here as this block moved without knowing to BPS
       FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
-      volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
+      volume.incrNumBlocks(block.getBlockPoolId());
     }
 
     removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
@@ -2072,7 +2073,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @return the failed volumes. Returns null if no volume failed.
    */
   @Override // FsDatasetSpi
-  public Set<File> checkDataDir() {
+  public Set<StorageLocation> checkDataDir() {
    return volumes.checkDirs();
   }
     
@@ -2250,9 +2251,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                     .setFsVolume(vol)
                     .setDirectoryToUse(diskFile.getParentFile())
                     .build();
-              ((FsVolumeImpl) vol).getBlockPoolSlice(bpid)
-                  .resolveDuplicateReplicas(
-                      memBlockInfo, diskBlockInfo, volumeMap);
+              ((FsVolumeImpl) vol).resolveDuplicateReplicas(bpid,
+                  memBlockInfo, diskBlockInfo, volumeMap);
             }
           } else {
             if (!diskFile.delete()) {
@@ -2803,15 +2803,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // Add thread for DISK volume if RamDisk is configured
     if (ramDiskConfigured &&
         asyncLazyPersistService != null &&
-        !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
-      asyncLazyPersistService.addVolume(v.getCurrentDir());
+        !asyncLazyPersistService.queryVolume(v)) {
+      asyncLazyPersistService.addVolume(v);
     }
 
     // Remove thread for DISK volume if RamDisk is not configured
     if (!ramDiskConfigured &&
         asyncLazyPersistService != null &&
-        asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
-      asyncLazyPersistService.removeVolume(v.getCurrentDir());
+        asyncLazyPersistService.queryVolume(v)) {
+      asyncLazyPersistService.removeVolume(v);
     }
   }
 
@@ -2946,11 +2946,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
           // Move the replica from lazyPersist/ to finalized/ on
           // the target volume
-          BlockPoolSlice bpSlice =
-              replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
-
           newReplicaInfo =
-              bpSlice.activateSavedReplica(replicaInfo, replicaState);
+              replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
+                  replicaInfo, replicaState);
 
           // Update the volumeMap entry.
           volumeMap.add(bpid, newReplicaInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message