hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [04/43] hadoop git commit: HDFS-7035. Make adding a new data directory to the DataNode an atomic operation and improve error handling (Lei Xu via Colin P. McCabe) (cherry picked from commit a9331fe9b071fdcdae0c6c747d7b6b306142e671)
Date Fri, 14 Aug 2015 07:17:50 GMT
HDFS-7035. Make adding a new data directory to the DataNode an atomic operation and improve error handling (Lei Xu via Colin P. McCabe)
(cherry picked from commit a9331fe9b071fdcdae0c6c747d7b6b306142e671)

(cherry picked from commit ec2621e907742aad0264c5f533783f0f18565880)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d79a5849
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d79a5849
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d79a5849

Branch: refs/heads/sjlee/hdfs-merge
Commit: d79a5849999cdb0bc315938b80ed71b4f2dcb720
Parents: 3827a1a
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Thu Oct 30 17:31:23 2014 -0700
Committer: Sangjin Lee <sjlee@apache.org>
Committed: Wed Aug 12 21:31:34 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/common/Storage.java      |  15 +
 .../hadoop/hdfs/server/common/StorageInfo.java  |   4 +
 .../server/datanode/BlockPoolSliceStorage.java  | 168 +++++---
 .../hadoop/hdfs/server/datanode/DataNode.java   | 109 ++++--
 .../hdfs/server/datanode/DataStorage.java       | 382 ++++++++++---------
 .../server/datanode/fsdataset/FsDatasetSpi.java |   6 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 161 +++-----
 .../server/datanode/SimulatedFSDataset.java     |   7 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    | 108 +++++-
 .../hdfs/server/datanode/TestDataStorage.java   |  26 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  27 +-
 11 files changed, 575 insertions(+), 438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 735e0c1..14b52ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -821,6 +821,21 @@ public abstract class Storage extends StorageInfo {
   }
 
   /**
+   * Returns true if the storage directory on the given directory is already
+   * loaded.
+   * @param root the root directory of a {@link StorageDirectory}
+   * @throws IOException if failed to get canonical path.
+   */
+  protected boolean containsStorageDir(File root) throws IOException {
+    for (StorageDirectory sd : storageDirs) {
+      if (sd.getRoot().getCanonicalPath().equals(root.getCanonicalPath())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Return true if the layout of the given storage directory is from a version
    * of Hadoop prior to the introduction of the "current" and "previous"
    * directories which allow upgrade and rollback.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 50c8044..a3f82ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -192,6 +192,10 @@ public class StorageInfo {
     namespaceID = nsId;
   }
 
+  public void setServiceLayoutVersion(int lv) {
+    this.layoutVersion = lv;
+  }
+
   public int getServiceLayoutVersion() {
     return storageType == NodeType.DATA_NODE ? HdfsConstants.DATANODE_LAYOUT_VERSION
         : HdfsConstants.NAMENODE_LAYOUT_VERSION;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index 8333bb4..8c819a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
@@ -35,9 +36,7 @@ import org.apache.hadoop.util.Daemon;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -127,80 +126,123 @@ public class BlockPoolSliceStorage extends Storage {
         new ConcurrentHashMap<String, Boolean>());
   }
 
+  // Expose visibility for VolumeBuilder#commit().
+  public void addStorageDir(StorageDirectory sd) {
+    super.addStorageDir(sd);
+  }
+
   /**
-   * Analyze storage directories. Recover from previous transitions if required.
-   * 
+   * Load one storage directory. Recover from previous transitions if required.
+   *
+   * @param datanode datanode instance
+   * @param nsInfo namespace information
+   * @param dataDir the root path of the storage directory
+   * @param startOpt startup option
+   * @return the StorageDirectory successfully loaded.
+   * @throws IOException
+   */
+  private StorageDirectory loadStorageDirectory(DataNode datanode,
+      NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException {
+    StorageDirectory sd = new StorageDirectory(dataDir, null, true);
+    try {
+      StorageState curState = sd.analyzeStorage(startOpt, this);
+      // sd is locked but not opened
+      switch (curState) {
+      case NORMAL:
+        break;
+      case NON_EXISTENT:
+        LOG.info("Block pool storage directory " + dataDir + " does not exist");
+        throw new IOException("Storage directory " + dataDir
+            + " does not exist");
+      case NOT_FORMATTED: // format
+        LOG.info("Block pool storage directory " + dataDir
+            + " is not formatted for " + nsInfo.getBlockPoolID());
+        LOG.info("Formatting ...");
+        format(sd, nsInfo);
+        break;
+      default:  // recovery part is common
+        sd.doRecover(curState);
+      }
+
+      // 2. Do transitions
+      // Each storage directory is treated individually.
+      // During startup some of them can upgrade or roll back
+      // while others could be up-to-date for the regular startup.
+      doTransition(datanode, sd, nsInfo, startOpt);
+      if (getCTime() != nsInfo.getCTime()) {
+        throw new IOException(
+            "Data-node and name-node CTimes must be the same.");
+      }
+
+      // 3. Update successfully loaded storage.
+      setServiceLayoutVersion(getServiceLayoutVersion());
+      writeProperties(sd);
+
+      return sd;
+    } catch (IOException ioe) {
+      sd.unlock();
+      throw ioe;
+    }
+  }
+
+  /**
+   * Analyze and load storage directories. Recover from previous transitions if
+   * required.
+   *
+   * The block pool storages are either all analyzed or none of them is loaded.
+   * Therefore, a failure on loading any block pool storage results a faulty
+   * data volume.
+   *
    * @param datanode Datanode to which this storage belongs to
    * @param nsInfo namespace information
    * @param dataDirs storage directories of block pool
    * @param startOpt startup option
+   * @return an array of loaded block pool directories.
    * @throws IOException on error
    */
-  void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
+  List<StorageDirectory> loadBpStorageDirectories(
+      DataNode datanode, NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt) throws IOException {
-    LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
-    Set<String> existingStorageDirs = new HashSet<String>();
-    for (int i = 0; i < getNumStorageDirs(); i++) {
-      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
-    }
-
-    // 1. For each BP data directory analyze the state and
-    // check whether all is consistent before transitioning.
-    ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
-        dataDirs.size());
-    for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
-      File dataDir = it.next();
-      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
-        LOG.info("Storage directory " + dataDir + " has already been used.");
-        it.remove();
-        continue;
-      }
-      StorageDirectory sd = new StorageDirectory(dataDir, null, true);
-      StorageState curState;
-      try {
-        curState = sd.analyzeStorage(startOpt, this);
-        // sd is locked but not opened
-        switch (curState) {
-        case NORMAL:
-          break;
-        case NON_EXISTENT:
-          // ignore this storage
-          LOG.info("Storage directory " + dataDir + " does not exist.");
-          it.remove();
-          continue;
-        case NOT_FORMATTED: // format
-          LOG.info("Storage directory " + dataDir + " is not formatted.");
-          LOG.info("Formatting ...");
-          format(sd, nsInfo);
-          break;
-        default: // recovery part is common
-          sd.doRecover(curState);
+    List<StorageDirectory> succeedDirs = Lists.newArrayList();
+    try {
+      for (File dataDir : dataDirs) {
+        if (containsStorageDir(dataDir)) {
+          throw new IOException(
+              "BlockPoolSliceStorage.recoverTransitionRead: " +
+                  "attempt to load an used block storage: " + dataDir);
         }
-      } catch (IOException ioe) {
-        sd.unlock();
-        throw ioe;
+        StorageDirectory sd =
+            loadStorageDirectory(datanode, nsInfo, dataDir, startOpt);
+        succeedDirs.add(sd);
       }
-      // add to the storage list. This is inherited from parent class, Storage.
-      addStorageDir(sd);
-      dataDirStates.add(curState);
+    } catch (IOException e) {
+      LOG.warn("Failed to analyze storage directories for block pool "
+          + nsInfo.getBlockPoolID(), e);
+      throw e;
     }
+    return succeedDirs;
+  }
 
-    if (dataDirs.size() == 0) // none of the data dirs exist
-      throw new IOException(
-          "All specified directories are not accessible or do not exist.");
-
-    // 2. Do transitions
-    // Each storage directory is treated individually.
-    // During startup some of them can upgrade or roll back
-    // while others could be up-to-date for the regular startup.
-    for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
-      assert getCTime() == nsInfo.getCTime() 
-          : "Data-node and name-node CTimes must be the same.";
+  /**
+   * Analyze storage directories. Recover from previous transitions if required.
+   *
+   * The block pool storages are either all analyzed or none of them is loaded.
+   * Therefore, a failure on loading any block pool storage results a faulty
+   * data volume.
+   *
+   * @param datanode Datanode to which this storage belongs to
+   * @param nsInfo namespace information
+   * @param dataDirs storage directories of block pool
+   * @param startOpt startup option
+   * @throws IOException on error
+   */
+  void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
+      Collection<File> dataDirs, StartupOption startOpt) throws IOException {
+    LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
+    for (StorageDirectory sd : loadBpStorageDirectories(
+        datanode, nsInfo, dataDirs, startOpt)) {
+      addStorageDir(sd);
     }
-
-    // 3. Update all storages. Some of them might have just been formatted.
-    this.writeAll();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index badb845..3dc0c3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -44,12 +44,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.BufferedOutputStream;
@@ -81,6 +77,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
@@ -184,7 +184,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.tracing.TraceAdminPB;
 import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
 import org.apache.hadoop.tracing.TraceAdminProtocolPB;
 import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@@ -453,20 +452,27 @@ public class DataNode extends ReconfigurableBase
    */
   @VisibleForTesting
   static class ChangedVolumes {
+    /** The storage locations of the newly added volumes. */
     List<StorageLocation> newLocations = Lists.newArrayList();
+    /** The storage locations of the volumes that are removed. */
     List<StorageLocation> deactivateLocations = Lists.newArrayList();
+    /** The unchanged locations that existed in the old configuration. */
+    List<StorageLocation> unchangedLocations = Lists.newArrayList();
   }
 
   /**
    * Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
    * changed volumes.
+   * @param newVolumes a comma separated string that specifies the data volumes.
    * @return changed volumes.
    * @throws IOException if none of the directories are specified in the
    * configuration.
    */
   @VisibleForTesting
-  ChangedVolumes parseChangedVolumes() throws IOException {
-    List<StorageLocation> locations = getStorageLocations(getConf());
+  ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
+    List<StorageLocation> locations = getStorageLocations(conf);
 
     if (locations.isEmpty()) {
       throw new IOException("No directory is specified.");
@@ -481,9 +487,11 @@ public class DataNode extends ReconfigurableBase
       boolean found = false;
       for (Iterator<StorageLocation> sl = results.newLocations.iterator();
            sl.hasNext(); ) {
-        if (sl.next().getFile().getCanonicalPath().equals(
+        StorageLocation location = sl.next();
+        if (location.getFile().getCanonicalPath().equals(
             dir.getRoot().getCanonicalPath())) {
           sl.remove();
+          results.unchangedLocations.add(location);
           found = true;
           break;
         }
@@ -501,18 +509,21 @@ public class DataNode extends ReconfigurableBase
   /**
    * Attempts to reload data volumes with new configuration.
    * @param newVolumes a comma separated string that specifies the data volumes.
-   * @throws Exception
+   * @throws IOException on error. If an IOException is thrown, some new volumes
+   * may have been successfully added and removed.
    */
-  private synchronized void refreshVolumes(String newVolumes) throws Exception {
+  private synchronized void refreshVolumes(String newVolumes) throws IOException {
     Configuration conf = getConf();
     conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
-    List<StorageLocation> locations = getStorageLocations(conf);
-
-    final int numOldDataDirs = dataDirs.size();
-    dataDirs = locations;
-    ChangedVolumes changedVolumes = parseChangedVolumes();
 
+    int numOldDataDirs = dataDirs.size();
+    ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
     StringBuilder errorMessageBuilder = new StringBuilder();
+    List<String> effectiveVolumes = Lists.newArrayList();
+    for (StorageLocation sl : changedVolumes.unchangedLocations) {
+      effectiveVolumes.add(sl.toString());
+    }
+
     try {
       if (numOldDataDirs + changedVolumes.newLocations.size() -
           changedVolumes.deactivateLocations.size() <= 0) {
@@ -523,34 +534,43 @@ public class DataNode extends ReconfigurableBase
             Joiner.on(",").join(changedVolumes.newLocations));
 
         // Add volumes for each Namespace
+        final List<NamespaceInfo> nsInfos = Lists.newArrayList();
         for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
-          NamespaceInfo nsInfo = bpos.getNamespaceInfo();
-          LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID());
-          storage.addStorageLocations(
-              this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP);
+          nsInfos.add(bpos.getNamespaceInfo());
         }
-        List<String> bpids = Lists.newArrayList();
-        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
-          bpids.add(bpos.getBlockPoolId());
+        ExecutorService service = Executors.newFixedThreadPool(
+            changedVolumes.newLocations.size());
+        List<Future<IOException>> exceptions = Lists.newArrayList();
+        for (final StorageLocation location : changedVolumes.newLocations) {
+          exceptions.add(service.submit(new Callable<IOException>() {
+            @Override
+            public IOException call() {
+              try {
+                data.addVolume(location, nsInfos);
+              } catch (IOException e) {
+                return e;
+              }
+              return null;
+            }
+          }));
         }
-        List<StorageLocation> succeedVolumes =
-            data.addVolumes(changedVolumes.newLocations, bpids);
-
-        if (succeedVolumes.size() < changedVolumes.newLocations.size()) {
-          List<StorageLocation> failedVolumes = Lists.newArrayList();
-          // Clean all failed volumes.
-          for (StorageLocation location : changedVolumes.newLocations) {
-            if (!succeedVolumes.contains(location)) {
-              errorMessageBuilder.append("FAILED TO ADD:");
-              failedVolumes.add(location);
+
+        for (int i = 0; i < changedVolumes.newLocations.size(); i++) {
+          StorageLocation volume = changedVolumes.newLocations.get(i);
+          Future<IOException> ioExceptionFuture = exceptions.get(i);
+          try {
+            IOException ioe = ioExceptionFuture.get();
+            if (ioe != null) {
+              errorMessageBuilder.append(String.format("FAILED TO ADD: %s: %s\n",
+                  volume.toString(), ioe.getMessage()));
             } else {
-              errorMessageBuilder.append("ADDED:");
+              effectiveVolumes.add(volume.toString());
             }
-            errorMessageBuilder.append(location);
-            errorMessageBuilder.append("\n");
+            LOG.info("Storage directory is loaded: " + volume.toString());
+          } catch (Exception e) {
+            errorMessageBuilder.append(String.format("FAILED to ADD: %s: %s\n",
+                volume.toString(), e.getMessage()));
           }
-          storage.removeVolumes(failedVolumes);
-          data.removeVolumes(failedVolumes);
         }
       }
 
@@ -559,15 +579,20 @@ public class DataNode extends ReconfigurableBase
             Joiner.on(",").join(changedVolumes.deactivateLocations));
 
         data.removeVolumes(changedVolumes.deactivateLocations);
-        storage.removeVolumes(changedVolumes.deactivateLocations);
+        try {
+          storage.removeVolumes(changedVolumes.deactivateLocations);
+        } catch (IOException e) {
+          errorMessageBuilder.append(e.getMessage());
+        }
       }
 
       if (errorMessageBuilder.length() > 0) {
         throw new IOException(errorMessageBuilder.toString());
       }
-    } catch (IOException e) {
-      LOG.warn("There is IOException when refresh volumes! ", e);
-      throw e;
+    } finally {
+      conf.set(DFS_DATANODE_DATA_DIR_KEY,
+          Joiner.on(",").join(effectiveVolumes));
+      dataDirs = getStorageLocations(conf);
     }
   }
 
@@ -1309,7 +1334,7 @@ public class DataNode extends ReconfigurableBase
       final String bpid = nsInfo.getBlockPoolID();
       //read storage info, lock data dirs and transition fs state if necessary
       synchronized (this) {
-        storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+        storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
       }
       final StorageInfo bpStorage = storage.getBPStorage(bpid);
       LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 21a3236..8863724 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -192,176 +195,215 @@ public class DataStorage extends Storage {
     }
     return null;
   }
-  
+
   /**
-   * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+   * VolumeBuilder holds the metadata (e.g., the storage directories) of the
+   * prepared volume returned from {@link prepareVolume()}. Calling {@link build()}
+   * to add the metadata to {@link DataStorage} so that this prepared volume can
+   * be active.
    */
-  private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
-    this.layoutVersion = getServiceLayoutVersion();
-    for (StorageDirectory dir : dirs) {
-      writeProperties(dir);
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static public class VolumeBuilder {
+    private DataStorage storage;
+    /** Volume level storage directory. */
+    private StorageDirectory sd;
+    /** Mapping from block pool ID to an array of storage directories. */
+    private Map<String, List<StorageDirectory>> bpStorageDirMap =
+        Maps.newHashMap();
+
+    @VisibleForTesting
+    public VolumeBuilder(DataStorage storage, StorageDirectory sd) {
+      this.storage = storage;
+      this.sd = sd;
+    }
+
+    public final StorageDirectory getStorageDirectory() {
+      return this.sd;
+    }
+
+    private void addBpStorageDirectories(String bpid,
+        List<StorageDirectory> dirs) {
+      bpStorageDirMap.put(bpid, dirs);
+    }
+
+    /**
+     * Add loaded metadata of a data volume to {@link DataStorage}.
+     */
+    public void build() {
+      assert this.sd != null;
+      synchronized (storage) {
+        for (Map.Entry<String, List<StorageDirectory>> e :
+            bpStorageDirMap.entrySet()) {
+          final String bpid = e.getKey();
+          BlockPoolSliceStorage bpStorage = this.storage.bpStorageMap.get(bpid);
+          assert bpStorage != null;
+          for (StorageDirectory bpSd : e.getValue()) {
+            bpStorage.addStorageDir(bpSd);
+          }
+        }
+        storage.addStorageDir(sd);
+      }
+    }
+  }
+
+  private StorageDirectory loadStorageDirectory(DataNode datanode,
+      NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
+      throws IOException {
+    StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+    try {
+      StorageState curState = sd.analyzeStorage(startOpt, this);
+      // sd is locked but not opened
+      switch (curState) {
+      case NORMAL:
+        break;
+      case NON_EXISTENT:
+        LOG.info("Storage directory " + dataDir + " does not exist");
+        throw new IOException("Storage directory " + dataDir
+            + " does not exist");
+      case NOT_FORMATTED: // format
+        LOG.info("Storage directory " + dataDir + " is not formatted for "
+            + nsInfo.getBlockPoolID());
+        LOG.info("Formatting ...");
+        format(sd, nsInfo, datanode.getDatanodeUuid());
+        break;
+      default:  // recovery part is common
+        sd.doRecover(curState);
+      }
+
+      // 2. Do transitions
+      // Each storage directory is treated individually.
+      // During startup some of them can upgrade or roll back
+      // while others could be up-to-date for the regular startup.
+      doTransition(datanode, sd, nsInfo, startOpt);
+
+      // 3. Update successfully loaded storage.
+      setServiceLayoutVersion(getServiceLayoutVersion());
+      writeProperties(sd);
+
+      return sd;
+    } catch (IOException ioe) {
+      sd.unlock();
+      throw ioe;
     }
   }
 
   /**
-   * Add a list of volumes to be managed by DataStorage. If the volume is empty,
-   * format it, otherwise recover it from previous transitions if required.
+   * Prepare a storage directory. It creates a builder which can be used to add
+   * to the volume. If the volume cannot be added, it is OK to discard the
+   * builder later.
    *
-   * @param datanode the reference to DataNode.
-   * @param nsInfo namespace information
-   * @param dataDirs array of data storage directories
-   * @param startOpt startup option
-   * @throws IOException
+   * @param datanode DataNode object.
+   * @param volume the root path of a storage directory.
+   * @param nsInfos an array of namespace infos.
+   * @return a VolumeBuilder that holds the metadata of this storage directory
+   * and can be added to DataStorage later.
+   * @throws IOException if encounters I/O errors.
+   *
+   * Note that if there is IOException, the state of DataStorage is not modified.
    */
-  synchronized void addStorageLocations(DataNode datanode,
-      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
-      StartupOption startOpt)
-      throws IOException {
-    // Similar to recoverTransitionRead, it first ensures the datanode level
-    // format is completed.
-    List<StorageLocation> tmpDataDirs =
-        new ArrayList<StorageLocation>(dataDirs);
-    addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
-
-    Collection<File> bpDataDirs = new ArrayList<File>();
-    String bpid = nsInfo.getBlockPoolID();
-    for (StorageLocation dir : dataDirs) {
-      File dnRoot = dir.getFile();
-      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
-          STORAGE_DIR_CURRENT));
-      bpDataDirs.add(bpRoot);
-    }
-    // mkdir for the list of BlockPoolStorage
-    makeBlockPoolDataDir(bpDataDirs, null);
-    BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
-    if (bpStorage == null) {
-      bpStorage = new BlockPoolSliceStorage(
-          nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
-          nsInfo.getClusterID());
+  public VolumeBuilder prepareVolume(DataNode datanode, File volume,
+      List<NamespaceInfo> nsInfos) throws IOException {
+    if (containsStorageDir(volume)) {
+      final String errorMessage = "Storage directory is in use";
+      LOG.warn(errorMessage + ".");
+      throw new IOException(errorMessage);
     }
 
-    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
-    addBlockPoolStorage(bpid, bpStorage);
+    StorageDirectory sd = loadStorageDirectory(
+        datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
+    VolumeBuilder builder =
+        new VolumeBuilder(this, sd);
+    for (NamespaceInfo nsInfo : nsInfos) {
+      List<File> bpDataDirs = Lists.newArrayList();
+      bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(
+          nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
+      makeBlockPoolDataDir(bpDataDirs, null);
+
+      BlockPoolSliceStorage bpStorage;
+      final String bpid = nsInfo.getBlockPoolID();
+      synchronized (this) {
+        bpStorage = this.bpStorageMap.get(bpid);
+        if (bpStorage == null) {
+          bpStorage = new BlockPoolSliceStorage(
+              nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+              nsInfo.getClusterID());
+          addBlockPoolStorage(bpid, bpStorage);
+        }
+      }
+      builder.addBpStorageDirectories(
+          bpid, bpStorage.loadBpStorageDirectories(
+              datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP));
+    }
+    return builder;
   }
 
   /**
-   * Add a list of volumes to be managed by this DataStorage. If the volume is
-   * empty, it formats the volume, otherwise it recovers it from previous
-   * transitions if required.
-   *
-   * If isInitialize is false, only the directories that have finished the
-   * doTransition() process will be added into DataStorage.
+   * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+   * format it, otherwise recover it from previous transitions if required.
    *
    * @param datanode the reference to DataNode.
    * @param nsInfo namespace information
    * @param dataDirs array of data storage directories
    * @param startOpt startup option
-   * @param isInitialize whether it is called when DataNode starts up.
+   * @return a list of successfully loaded volumes.
    * @throws IOException
    */
-  private synchronized void addStorageLocations(DataNode datanode,
+  @VisibleForTesting
+  synchronized List<StorageLocation> addStorageLocations(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
-      StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
-      throws IOException {
-    Set<String> existingStorageDirs = new HashSet<String>();
-    for (int i = 0; i < getNumStorageDirs(); i++) {
-      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
-    }
-
-    // 1. For each data directory calculate its state and check whether all is
-    // consistent before transitioning. Format and recover.
-    ArrayList<StorageState> dataDirStates =
-        new ArrayList<StorageState>(dataDirs.size());
-    List<StorageDirectory> addedStorageDirectories =
-        new ArrayList<StorageDirectory>();
-    for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
-      File dataDir = it.next().getFile();
-      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
-        LOG.info("Storage directory " + dataDir + " has already been used.");
-        it.remove();
-        continue;
-      }
-      StorageDirectory sd = new StorageDirectory(dataDir);
-      StorageState curState;
-      try {
-        curState = sd.analyzeStorage(startOpt, this);
-        // sd is locked but not opened
-        switch (curState) {
-        case NORMAL:
-          break;
-        case NON_EXISTENT:
-          // ignore this storage
-          LOG.info("Storage directory " + dataDir + " does not exist");
-          it.remove();
+      StartupOption startOpt) throws IOException {
+    final String bpid = nsInfo.getBlockPoolID();
+    List<StorageLocation> successVolumes = Lists.newArrayList();
+    for (StorageLocation dataDir : dataDirs) {
+      File root = dataDir.getFile();
+      if (!containsStorageDir(root)) {
+        try {
+          // It first ensures the datanode level format is completed.
+          StorageDirectory sd = loadStorageDirectory(
+              datanode, nsInfo, root, startOpt);
+          addStorageDir(sd);
+        } catch (IOException e) {
+          LOG.warn(e);
           continue;
-        case NOT_FORMATTED: // format
-          LOG.info("Storage directory " + dataDir + " is not formatted for "
-            + nsInfo.getBlockPoolID());
-          LOG.info("Formatting ...");
-          format(sd, nsInfo, datanode.getDatanodeUuid());
-          break;
-        default:  // recovery part is common
-          sd.doRecover(curState);
         }
-      } catch (IOException ioe) {
-        sd.unlock();
-        LOG.warn("Ignoring storage directory " + dataDir
-            + " due to an exception", ioe);
-        //continue with other good dirs
-        continue;
-      }
-      if (isInitialize) {
-        addStorageDir(sd);
+      } else {
+        LOG.info("Storage directory " + dataDir + " has already been used.");
       }
-      addedStorageDirectories.add(sd);
-      dataDirStates.add(curState);
-    }
 
-    if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
-      // none of the data dirs exist
-      if (ignoreExistingDirs) {
-        return;
-      }
-      throw new IOException(
-          "All specified directories are not accessible or do not exist.");
-    }
-
-    // 2. Do transitions
-    // Each storage directory is treated individually.
-    // During startup some of them can upgrade or rollback
-    // while others could be up-to-date for the regular startup.
-    for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
-        it.hasNext(); ) {
-      StorageDirectory sd = it.next();
+      List<File> bpDataDirs = new ArrayList<File>();
+      bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root,
+              STORAGE_DIR_CURRENT)));
       try {
-        doTransition(datanode, sd, nsInfo, startOpt);
-        createStorageID(sd);
-      } catch (IOException e) {
-        if (!isInitialize) {
-          sd.unlock();
-          it.remove();
-          continue;
+        makeBlockPoolDataDir(bpDataDirs, null);
+        BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+        if (bpStorage == null) {
+          bpStorage = new BlockPoolSliceStorage(
+              nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+              nsInfo.getClusterID());
         }
-        unlockAll();
-        throw e;
-      }
-    }
-
-    // 3. Update all successfully loaded storages. Some of them might have just
-    // been formatted.
-    this.writeAll(addedStorageDirectories);
 
-    // 4. Make newly loaded storage directories visible for service.
-    if (!isInitialize) {
-      this.storageDirs.addAll(addedStorageDirectories);
+        bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+        addBlockPoolStorage(bpid, bpStorage);
+      } catch (IOException e) {
+        LOG.warn("Failed to add storage for block pool: " + bpid + " : "
+            + e.getMessage());
+        continue;
+      }
+      successVolumes.add(dataDir);
     }
+    return successVolumes;
   }
 
   /**
-   * Remove volumes from DataStorage.
+   * Remove volumes from DataStorage. All volumes are removed even when the
+   * IOException is thrown.
+   *
    * @param locations a collection of volumes.
+   * @throws IOException if I/O error when unlocking storage directory.
    */
-  synchronized void removeVolumes(Collection<StorageLocation> locations) {
+  synchronized void removeVolumes(Collection<StorageLocation> locations)
+      throws IOException {
     if (locations.isEmpty()) {
       return;
     }
@@ -375,6 +417,7 @@ public class DataStorage extends Storage {
       bpsStorage.removeVolumes(dataDirs);
     }
 
+    StringBuilder errorMsgBuilder = new StringBuilder();
     for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
          it.hasNext(); ) {
       StorageDirectory sd = it.next();
@@ -386,13 +429,18 @@ public class DataStorage extends Storage {
           LOG.warn(String.format(
             "I/O error attempting to unlock storage directory %s.",
             sd.getRoot()), e);
+          errorMsgBuilder.append(String.format("Failed to remove %s: %s\n",
+              sd.getRoot(), e.getMessage()));
         }
       }
     }
+    if (errorMsgBuilder.length() > 0) {
+      throw new IOException(errorMsgBuilder.toString());
+    }
   }
 
   /**
-   * Analyze storage directories.
+   * Analyze storage directories for a specific block pool.
    * Recover from previous transitions if required.
    * Perform fs state transition if necessary depending on the namespace info.
    * Read storage info.
@@ -400,60 +448,25 @@ public class DataStorage extends Storage {
    * This method should be synchronized between multiple DN threads.  Only the
    * first DN thread does DN level storage dir recoverTransitionRead.
    *
-   * @param nsInfo namespace information
-   * @param dataDirs array of data storage directories
-   * @param startOpt startup option
-   * @throws IOException
-   */
-  synchronized void recoverTransitionRead(DataNode datanode,
-      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
-      StartupOption startOpt)
-      throws IOException {
-    if (initialized) {
-      // DN storage has been initialized, no need to do anything
-      return;
-    }
-    LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
-        + " and NameNode layout version: " + nsInfo.getLayoutVersion());
-
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
-    addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
-    
-    // mark DN storage is initialized
-    this.initialized = true;
-  }
-
-  /**
-   * recoverTransitionRead for a specific block pool
-   * 
    * @param datanode DataNode
-   * @param bpID Block pool Id
    * @param nsInfo Namespace info of namenode corresponding to the block pool
    * @param dataDirs Storage directories
    * @param startOpt startup option
    * @throws IOException on error
    */
-  void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
+  void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
       Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
-    // First ensure datanode level format/snapshot/rollback is completed
-    recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
-
-    // Create list of storage directories for the block pool
-    Collection<File> bpDataDirs = new ArrayList<File>();
-    for(StorageLocation dir : dataDirs) {
-      File dnRoot = dir.getFile();
-      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
-          STORAGE_DIR_CURRENT));
-      bpDataDirs.add(bpRoot);
+    if (this.initialized) {
+      LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+          + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+      this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+      // mark DN storage is initialized
+      this.initialized = true;
     }
 
-    // mkdir for the list of BlockPoolStorage
-    makeBlockPoolDataDir(bpDataDirs, null);
-    BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
-        nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
-    
-    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
-    addBlockPoolStorage(bpID, bpStorage);
+    if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
+      throw new IOException("All specified directories are failed to load.");
+    }
   }
 
   /**
@@ -669,12 +682,15 @@ public class DataStorage extends Storage {
     // meaningful at BlockPoolSliceStorage level. 
 
     // regular start up. 
-    if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION)
+    if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION) {
+      createStorageID(sd);
       return; // regular startup
+    }
     
     // do upgrade
     if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
       doUpgrade(datanode, sd, nsInfo);  // upgrade
+      createStorageID(sd);
       return;
     }
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a9ab973..3ab4ffd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -100,8 +101,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   public List<V> getVolumes();
 
   /** Add an array of StorageLocation to FsDataset. */
-  public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
-      final Collection<String> bpids);
+  public void addVolume(
+      final StorageLocation location,
+      final List<NamespaceInfo> nsInfos) throws IOException;
 
   /** Removes a collection of volumes from FsDataset. */
   public void removeVolumes(Collection<StorageLocation> volumes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e046ecd..e7fa6d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -31,7 +31,6 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -95,6 +94,7 @@ import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskRepli
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
@@ -308,30 +308,39 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaMap tempVolumeMap = new ReplicaMap(this);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
-    volumeMap.addAll(tempVolumeMap);
-    volumes.addVolume(fsVolume);
-    storageMap.put(sd.getStorageUuid(),
-        new DatanodeStorage(sd.getStorageUuid(),
-            DatanodeStorage.State.NORMAL,
-            storageType));
-    asyncDiskService.addVolume(sd.getCurrentDir());
+    synchronized (this) {
+      volumeMap.addAll(tempVolumeMap);
+      storageMap.put(sd.getStorageUuid(),
+          new DatanodeStorage(sd.getStorageUuid(),
+              DatanodeStorage.State.NORMAL,
+              storageType));
+      asyncDiskService.addVolume(sd.getCurrentDir());
+      volumes.addVolume(fsVolume);
+    }
 
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
   }
 
-  private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations,
-      Storage.StorageDirectory sd, final Collection<String> bpids)
+  @Override
+  public void addVolume(final StorageLocation location,
+      final List<NamespaceInfo> nsInfos)
       throws IOException {
-    final File dir = sd.getCurrentDir();
-    final StorageType storageType =
-        getStorageTypeFromLocations(dataLocations, sd.getRoot());
+    final File dir = location.getFile();
 
+    // Prepare volume in DataStorage
+    DataStorage.VolumeBuilder builder =
+        dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
+
+    final Storage.StorageDirectory sd = builder.getStorageDirectory();
+
+    StorageType storageType = location.getStorageType();
     final FsVolumeImpl fsVolume = new FsVolumeImpl(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
     final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
+    ArrayList<IOException> exceptions = Lists.newArrayList();
 
-    List<IOException> exceptions = Lists.newArrayList();
-    for (final String bpid : bpids) {
+    for (final NamespaceInfo nsInfo : nsInfos) {
+      String bpid = nsInfo.getBlockPoolID();
       try {
         fsVolume.addBlockPool(bpid, this.conf);
         fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
@@ -342,89 +351,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
     if (!exceptions.isEmpty()) {
-      // The states of FsDatasteImpl are not modified, thus no need to rolled back.
       throw MultipleIOException.createIOException(exceptions);
     }
 
-    volumeMap.addAll(tempVolumeMap);
-    storageMap.put(sd.getStorageUuid(),
-        new DatanodeStorage(sd.getStorageUuid(),
-            DatanodeStorage.State.NORMAL,
-            storageType));
-    asyncDiskService.addVolume(sd.getCurrentDir());
-    volumes.addVolume(fsVolume);
-
-    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
-  }
-
-  /**
-   * Add an array of StorageLocation to FsDataset.
-   *
-   * @pre dataStorage must have these volumes.
-   * @param volumes an array of storage locations for adding volumes.
-   * @param bpids block pool IDs.
-   * @return an array of successfully loaded volumes.
-   */
-  @Override
-  public synchronized List<StorageLocation> addVolumes(
-      final List<StorageLocation> volumes, final Collection<String> bpids) {
-    final Collection<StorageLocation> dataLocations =
-        DataNode.getStorageLocations(this.conf);
-    final Map<String, Storage.StorageDirectory> allStorageDirs =
-        new HashMap<String, Storage.StorageDirectory>();
-    List<StorageLocation> succeedVolumes = Lists.newArrayList();
-    try {
-      for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
-        Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
-        allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd);
-      }
-    } catch (IOException ioe) {
-      LOG.warn("Caught exception when parsing storage URL.", ioe);
-      return succeedVolumes;
-    }
-
-    final boolean[] successFlags = new boolean[volumes.size()];
-    Arrays.fill(successFlags, false);
-    List<Thread> volumeAddingThreads = Lists.newArrayList();
-    for (int i = 0; i < volumes.size(); i++) {
-      final int idx = i;
-      Thread t = new Thread() {
-        public void run() {
-          StorageLocation vol = volumes.get(idx);
-          try {
-            String key = vol.getFile().getCanonicalPath();
-            if (!allStorageDirs.containsKey(key)) {
-              LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
-            } else {
-              addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key),
-                  bpids);
-              successFlags[idx] = true;
-            }
-          } catch (IOException e) {
-            LOG.warn("Caught exception when adding volume " + vol, e);
-          }
-        }
-      };
-      volumeAddingThreads.add(t);
-      t.start();
-    }
-
-    for (Thread t : volumeAddingThreads) {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        LOG.warn("Caught InterruptedException when adding volume.", e);
-      }
-    }
+    setupAsyncLazyPersistThread(fsVolume);
 
-    setupAsyncLazyPersistThreads();
-
-    for (int i = 0; i < volumes.size(); i++) {
-      if (successFlags[i]) {
-        succeedVolumes.add(volumes.get(i));
-      }
+    builder.build();
+    synchronized (this) {
+      volumeMap.addAll(tempVolumeMap);
+      storageMap.put(sd.getStorageUuid(),
+          new DatanodeStorage(sd.getStorageUuid(),
+              DatanodeStorage.State.NORMAL,
+              storageType));
+      asyncDiskService.addVolume(sd.getCurrentDir());
+      volumes.addVolume(fsVolume);
     }
-    return succeedVolumes;
+    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
   }
 
   /**
@@ -2477,24 +2419,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   // added or removed.
   // This should only be called when the FsDataSetImpl#volumes list is finalized.
   private void setupAsyncLazyPersistThreads() {
-    boolean ramDiskConfigured = ramDiskConfigured();
     for (FsVolumeImpl v: getVolumes()){
-      // Skip transient volumes
-      if (v.isTransientStorage()) {
-        continue;
-      }
+      setupAsyncLazyPersistThread(v);
+    }
+  }
 
-      // Add thread for DISK volume if RamDisk is configured
-      if (ramDiskConfigured &&
-          !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
-        asyncLazyPersistService.addVolume(v.getCurrentDir());
-      }
+  private void setupAsyncLazyPersistThread(final FsVolumeImpl v) {
+    // Skip transient volumes
+    if (v.isTransientStorage()) {
+      return;
+    }
+    boolean ramDiskConfigured = ramDiskConfigured();
+    // Add thread for DISK volume if RamDisk is configured
+    if (ramDiskConfigured &&
+        !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+      asyncLazyPersistService.addVolume(v.getCurrentDir());
+    }
 
-      // Remove thread for DISK volume if RamDisk is not configured
-      if (!ramDiskConfigured &&
-          asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
-        asyncLazyPersistService.removeVolume(v.getCurrentDir());
-      }
+    // Remove thread for DISK volume if RamDisk is not configured
+    if (!ramDiskConfigured &&
+        asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+      asyncLazyPersistService.removeVolume(v.getCurrentDir());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 46cb46a..3e5034a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
@@ -1194,8 +1196,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
-      final Collection<String> bpids) {
+  public void addVolume(
+      final StorageLocation location,
+      final List<NamespaceInfo> nsInfos) throws IOException {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index f6e984b..7ed0421 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.BlockLocation;
@@ -32,6 +34,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -56,9 +60,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -185,10 +194,11 @@ public class TestDataNodeHotSwapVolumes {
     }
     assertFalse(oldLocations.isEmpty());
 
-    String newPaths = "/foo/path1,/foo/path2";
-    conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths);
+    String newPaths = oldLocations.get(0).getFile().getAbsolutePath() +
+        ",/foo/path1,/foo/path2";
 
-    DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes();
+    DataNode.ChangedVolumes changedVolumes =
+        dn.parseChangedVolumes(newPaths);
     List<StorageLocation> newVolumes = changedVolumes.newLocations;
     assertEquals(2, newVolumes.size());
     assertEquals(new File("/foo/path1").getAbsolutePath(),
@@ -197,21 +207,21 @@ public class TestDataNodeHotSwapVolumes {
       newVolumes.get(1).getFile().getAbsolutePath());
 
     List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
-    assertEquals(oldLocations.size(), removedVolumes.size());
-    for (int i = 0; i < removedVolumes.size(); i++) {
-      assertEquals(oldLocations.get(i).getFile(),
-          removedVolumes.get(i).getFile());
-    }
+    assertEquals(1, removedVolumes.size());
+    assertEquals(oldLocations.get(1).getFile(),
+        removedVolumes.get(0).getFile());
+
+    assertEquals(1, changedVolumes.unchangedLocations.size());
+    assertEquals(oldLocations.get(0).getFile(),
+        changedVolumes.unchangedLocations.get(0).getFile());
   }
 
   @Test
   public void testParseChangedVolumesFailures() throws IOException {
     startDFSCluster(1, 1);
     DataNode dn = cluster.getDataNodes().get(0);
-    Configuration conf = dn.getConf();
     try {
-      conf.set(DFS_DATANODE_DATA_DIR_KEY, "");
-      dn.parseChangedVolumes();
+      dn.parseChangedVolumes("");
       fail("Should throw IOException: empty inputs.");
     } catch (IOException e) {
       GenericTestUtils.assertExceptionContains("No directory is specified.", e);
@@ -219,7 +229,8 @@ public class TestDataNodeHotSwapVolumes {
   }
 
   /** Add volumes to the first DataNode. */
-  private void addVolumes(int numNewVolumes) throws ReconfigurationException {
+  private void addVolumes(int numNewVolumes)
+      throws ReconfigurationException, IOException {
     File dataDir = new File(cluster.getDataDirectory());
     DataNode dn = cluster.getDataNodes().get(0);  // First DataNode.
     Configuration conf = dn.getConf();
@@ -241,12 +252,26 @@ public class TestDataNodeHotSwapVolumes {
       newVolumeDirs.add(volumeDir);
       volumeDir.mkdirs();
       newDataDirBuf.append(",");
-      newDataDirBuf.append(volumeDir.toURI());
+      newDataDirBuf.append(
+          StorageLocation.parse(volumeDir.toString()).toString());
     }
 
     String newDataDir = newDataDirBuf.toString();
     dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
-    assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY));
+
+    // Verify the configuration value is appropriately set.
+    String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
+    String[] expectDataDirs = newDataDir.split(",");
+    assertEquals(expectDataDirs.length, effectiveDataDirs.length);
+    for (int i = 0; i < expectDataDirs.length; i++) {
+      StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
+      StorageLocation effectiveLocation =
+          StorageLocation.parse(effectiveDataDirs[i]);
+      assertEquals(expectLocation.getStorageType(),
+          effectiveLocation.getStorageType());
+      assertEquals(expectLocation.getFile().getCanonicalFile(),
+          effectiveLocation.getFile().getCanonicalFile());
+    }
 
     // Check that all newly created volumes are appropriately formatted.
     for (File volumeDir : newVolumeDirs) {
@@ -426,7 +451,7 @@ public class TestDataNodeHotSwapVolumes {
     dn.reconfigurePropertyImpl(
         DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
     assertFileLocksReleased(
-      new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
+        new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
 
     // Force DataNode to report missing blocks.
     dn.scheduleAllBlockReport(0);
@@ -439,6 +464,59 @@ public class TestDataNodeHotSwapVolumes {
     DFSTestUtil.waitReplication(fs, testFile, replFactor);
   }
 
+  @Test
+  public void testAddVolumeFailures() throws IOException {
+    startDFSCluster(1, 1);
+    final String dataDir = cluster.getDataDirectory();
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    List<String> newDirs = Lists.newArrayList();
+    final int NUM_NEW_DIRS = 4;
+    for (int i = 0; i < NUM_NEW_DIRS; i++) {
+      File newVolume = new File(dataDir, "new_vol" + i);
+      newDirs.add(newVolume.toString());
+      if (i % 2 == 0) {
+        // Make addVolume() fail.
+        newVolume.createNewFile();
+      }
+    }
+
+    String newValue = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY) + "," +
+        Joiner.on(",").join(newDirs);
+    try {
+      dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newValue);
+      fail("Expect to throw IOException.");
+    } catch (ReconfigurationException e) {
+      String errorMessage = e.getCause().getMessage();
+      String messages[] = errorMessage.split("\\r?\\n");
+      assertEquals(2, messages.length);
+      assertThat(messages[0], containsString("new_vol0"));
+      assertThat(messages[1], containsString("new_vol2"));
+    }
+
+    // Make sure that vol0 and vol2's metadata are not left in memory.
+    FsDatasetSpi<?> dataset = dn.getFSDataset();
+    for (FsVolumeSpi volume : dataset.getVolumes()) {
+      assertThat(volume.getBasePath(), is(not(anyOf(
+          is(newDirs.get(0)), is(newDirs.get(2))))));
+    }
+    DataStorage storage = dn.getStorage();
+    for (int i = 0; i < storage.getNumStorageDirs(); i++) {
+      Storage.StorageDirectory sd = storage.getStorageDir(i);
+      assertThat(sd.getRoot().toString(),
+          is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
+    }
+
+    // The newly effective conf does not have vol0 and vol2.
+    String[] effectiveVolumes =
+        dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(",");
+    assertEquals(4, effectiveVolumes.length);
+    for (String ev : effectiveVolumes) {
+      assertThat(new File(ev).getCanonicalPath(),
+          is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
+    }
+  }
+
   /**
    * Asserts that the storage lock file in each given directory has been
    * released.  This method works by trying to acquire the lock file itself.  If

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
index ed32243..c90b8e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
@@ -146,14 +146,11 @@ public class TestDataStorage {
     assertEquals(numLocations, storage.getNumStorageDirs());
 
     locations = createStorageLocations(numLocations);
-    try {
-      storage.addStorageLocations(mockDN, namespaceInfos.get(0),
-          locations, START_OPT);
-      fail("Expected to throw IOException: adding active directories.");
-    } catch (IOException e) {
-      GenericTestUtils.assertExceptionContains(
-          "All specified directories are not accessible or do not exist.", e);
-    }
+    List<StorageLocation> addedLocation =
+        storage.addStorageLocations(mockDN, namespaceInfos.get(0),
+            locations, START_OPT);
+    assertTrue(addedLocation.isEmpty());
+
     // The number of active storage dirs has not changed, since it tries to
     // add the storage dirs that are under service.
     assertEquals(numLocations, storage.getNumStorageDirs());
@@ -169,13 +166,12 @@ public class TestDataStorage {
     final int numLocations = 3;
     List<StorageLocation> locations =
         createStorageLocations(numLocations, true);
-
     try {
       storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
       fail("An IOException should throw: all StorageLocations are NON_EXISTENT");
     } catch (IOException e) {
       GenericTestUtils.assertExceptionContains(
-          "All specified directories are not accessible or do not exist.", e);
+          "All specified directories are failed to load.", e);
     }
     assertEquals(0, storage.getNumStorageDirs());
   }
@@ -191,9 +187,9 @@ public class TestDataStorage {
       throws IOException {
     final int numLocations = 3;
     List<StorageLocation> locations = createStorageLocations(numLocations);
-    String bpid = nsInfo.getBlockPoolID();
     // Prepare volumes
-    storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+    storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
+    assertEquals(numLocations, storage.getNumStorageDirs());
 
     // Reset DataStorage
     storage.unlockAll();
@@ -201,11 +197,11 @@ public class TestDataStorage {
     // Trigger an exception from doTransition().
     nsInfo.clusterID = "cluster1";
     try {
-      storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+      storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
       fail("Expect to throw an exception from doTransition()");
     } catch (IOException e) {
-      GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e);
+      GenericTestUtils.assertExceptionContains("All specified directories", e);
     }
-    assertEquals(numLocations, storage.getNumStorageDirs());
+    assertEquals(0, storage.getNumStorageDirs());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d79a5849/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 9a8ce9c..28951c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
@@ -42,7 +44,6 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -52,7 +53,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -68,6 +71,7 @@ public class TestFsDatasetImpl {
       new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
 
   private Configuration conf;
+  private DataNode datanode;
   private DataStorage storage;
   private DataBlockScanner scanner;
   private FsDatasetImpl dataset;
@@ -98,7 +102,7 @@ public class TestFsDatasetImpl {
 
   @Before
   public void setUp() throws IOException {
-    final DataNode datanode = Mockito.mock(DataNode.class);
+    datanode = Mockito.mock(DataNode.class);
     storage = Mockito.mock(DataStorage.class);
     scanner = Mockito.mock(DataBlockScanner.class);
     this.conf = new Configuration();
@@ -123,17 +127,24 @@ public class TestFsDatasetImpl {
     final int numNewVolumes = 3;
     final int numExistingVolumes = dataset.getVolumes().size();
     final int totalVolumes = numNewVolumes + numExistingVolumes;
-    List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
     Set<String> expectedVolumes = new HashSet<String>();
+    List<NamespaceInfo> nsInfos = Lists.newArrayList();
+    for (String bpid : BLOCK_POOL_IDS) {
+      nsInfos.add(new NamespaceInfo(0, "cluster-id", bpid, 1));
+    }
     for (int i = 0; i < numNewVolumes; i++) {
       String path = BASE_DIR + "/newData" + i;
-      newLocations.add(StorageLocation.parse(path));
-      when(storage.getStorageDir(numExistingVolumes + i))
-          .thenReturn(createStorageDirectory(new File(path)));
+      StorageLocation loc = StorageLocation.parse(path);
+      Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+      DataStorage.VolumeBuilder builder =
+          new DataStorage.VolumeBuilder(storage, sd);
+      when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+          anyListOf(NamespaceInfo.class)))
+          .thenReturn(builder);
+
+      dataset.addVolume(loc, nsInfos);
     }
-    when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
 
-    dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS));
     assertEquals(totalVolumes, dataset.getVolumes().size());
     assertEquals(totalVolumes, dataset.storageMap.size());
 


Mime
View raw message