hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1097905 [10/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/j...
Date Fri, 29 Apr 2011 18:16:38 GMT
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java Fri Apr 29 18:16:32 2011
@@ -46,7 +46,8 @@ public class TestDFSStartupVersions exte
   /**
    * Writes an INFO log message containing the parameters.
    */
-  void log(String label, NodeType nodeType, Integer testCase, StorageInfo version) {
+  void log(String label, NodeType nodeType, Integer testCase,
+      StorageData sd) {
     String testCaseLine = "";
     if (testCase != null) {
       testCaseLine = " testCase="+testCase;
@@ -55,9 +56,26 @@ public class TestDFSStartupVersions exte
     LOG.info("***TEST*** " + label + ":"
              + testCaseLine
              + " nodeType="+nodeType
-             + " layoutVersion="+version.getLayoutVersion()
-             + " namespaceID="+version.getNamespaceID()
-             + " fsscTime="+version.getCTime());
+             + " layoutVersion="+sd.storageInfo.getLayoutVersion()
+             + " namespaceID="+sd.storageInfo.getNamespaceID()
+             + " fsscTime="+sd.storageInfo.getCTime()
+             + " clusterID="+sd.storageInfo.getClusterID()
+             + " BlockPoolID="+sd.blockPoolId);
+  }
+  
+  /**
+   * Class used for initializing version information for tests
+   */
+  private static class StorageData {
+    private final StorageInfo storageInfo;
+    private final String blockPoolId;
+    
+    StorageData(int layoutVersion, int namespaceId, String clusterId,
+        long cTime, String bpid) {
+      storageInfo = new StorageInfo(layoutVersion, namespaceId, clusterId,
+          cTime);
+      blockPoolId = bpid;
+    }
   }
   
   /**
@@ -67,7 +85,7 @@ public class TestDFSStartupVersions exte
    *    {currentNamespaceId,incorrectNamespaceId} X
    *      {pastFsscTime,currentFsscTime,futureFsscTime}
    */
-  private StorageInfo[] initializeVersions() throws Exception {
+  private StorageData[] initializeVersions() throws Exception {
     int layoutVersionOld = Storage.LAST_UPGRADABLE_LAYOUT_VERSION;
     int layoutVersionCur = UpgradeUtilities.getCurrentLayoutVersion();
     int layoutVersionNew = Integer.MIN_VALUE;
@@ -76,26 +94,54 @@ public class TestDFSStartupVersions exte
     long fsscTimeOld = Long.MIN_VALUE;
     long fsscTimeCur = UpgradeUtilities.getCurrentFsscTime(null);
     long fsscTimeNew = Long.MAX_VALUE;
+    String clusterID = "testClusterID";
+    String invalidClusterID = "testClusterID";
+    String bpid = UpgradeUtilities.getCurrentBlockPoolID(null);
+    String invalidBpid = "invalidBpid";
     
-    return new StorageInfo[] {
-      new StorageInfo(layoutVersionOld, namespaceIdCur, fsscTimeOld), // 0
-      new StorageInfo(layoutVersionOld, namespaceIdCur, fsscTimeCur), // 1
-      new StorageInfo(layoutVersionOld, namespaceIdCur, fsscTimeNew), // 2
-      new StorageInfo(layoutVersionOld, namespaceIdOld, fsscTimeOld), // 3
-      new StorageInfo(layoutVersionOld, namespaceIdOld, fsscTimeCur), // 4
-      new StorageInfo(layoutVersionOld, namespaceIdOld, fsscTimeNew), // 5
-      new StorageInfo(layoutVersionCur, namespaceIdCur, fsscTimeOld), // 6
-      new StorageInfo(layoutVersionCur, namespaceIdCur, fsscTimeCur), // 7
-      new StorageInfo(layoutVersionCur, namespaceIdCur, fsscTimeNew), // 8
-      new StorageInfo(layoutVersionCur, namespaceIdOld, fsscTimeOld), // 9
-      new StorageInfo(layoutVersionCur, namespaceIdOld, fsscTimeCur), // 10
-      new StorageInfo(layoutVersionCur, namespaceIdOld, fsscTimeNew), // 11
-      new StorageInfo(layoutVersionNew, namespaceIdCur, fsscTimeOld), // 12
-      new StorageInfo(layoutVersionNew, namespaceIdCur, fsscTimeCur), // 13
-      new StorageInfo(layoutVersionNew, namespaceIdCur, fsscTimeNew), // 14
-      new StorageInfo(layoutVersionNew, namespaceIdOld, fsscTimeOld), // 15
-      new StorageInfo(layoutVersionNew, namespaceIdOld, fsscTimeCur), // 16
-      new StorageInfo(layoutVersionNew, namespaceIdOld, fsscTimeNew), // 17
+    return new StorageData[] {
+        new StorageData(layoutVersionOld, namespaceIdCur, clusterID,
+            fsscTimeOld, bpid), // 0
+        new StorageData(layoutVersionOld, namespaceIdCur, clusterID,
+            fsscTimeCur, bpid), // 1
+        new StorageData(layoutVersionOld, namespaceIdCur, clusterID,
+            fsscTimeNew, bpid), // 2
+        new StorageData(layoutVersionOld, namespaceIdOld, clusterID,
+            fsscTimeOld, bpid), // 3
+        new StorageData(layoutVersionOld, namespaceIdOld, clusterID,
+            fsscTimeCur, bpid), // 4
+        new StorageData(layoutVersionOld, namespaceIdOld, clusterID,
+            fsscTimeNew, bpid), // 5
+        new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+            fsscTimeOld, bpid), // 6
+        new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+            fsscTimeCur, bpid), // 7
+        new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+            fsscTimeNew, bpid), // 8
+        new StorageData(layoutVersionCur, namespaceIdOld, clusterID,
+            fsscTimeOld, bpid), // 9
+        new StorageData(layoutVersionCur, namespaceIdOld, clusterID,
+            fsscTimeCur, bpid), // 10
+        new StorageData(layoutVersionCur, namespaceIdOld, clusterID,
+            fsscTimeNew, bpid), // 11
+        new StorageData(layoutVersionNew, namespaceIdCur, clusterID,
+            fsscTimeOld, bpid), // 12
+        new StorageData(layoutVersionNew, namespaceIdCur, clusterID,
+            fsscTimeCur, bpid), // 13
+        new StorageData(layoutVersionNew, namespaceIdCur, clusterID,
+            fsscTimeNew, bpid), // 14
+        new StorageData(layoutVersionNew, namespaceIdOld, clusterID,
+            fsscTimeOld, bpid), // 15
+        new StorageData(layoutVersionNew, namespaceIdOld, clusterID,
+            fsscTimeCur, bpid), // 16
+        new StorageData(layoutVersionNew, namespaceIdOld, clusterID,
+            fsscTimeNew, bpid), // 17
+        // Test with invalid clusterId
+        new StorageData(layoutVersionCur, namespaceIdCur, invalidClusterID,
+            fsscTimeCur, bpid), // 18
+        // Test with invalid block pool Id
+        new StorageData(layoutVersionCur, namespaceIdCur, clusterID,
+            fsscTimeCur, invalidBpid) // 19
     };
   }
   
@@ -106,29 +152,52 @@ public class TestDFSStartupVersions exte
    * will work together. The rules for compatibility,
    * taken from the DFS Upgrade Design, are as follows:
    * <pre>
-   * 1. The data-node does regular startup (no matter which options 
+   * <ol>
+   * <li>Check 0: Datanode namespaceID != Namenode namespaceID the startup fails
+   * </li>
+   * <li>Check 1: Datanode clusterID != Namenode clusterID the startup fails
+   * </li>
+   * <li>Check 2: Datanode blockPoolID != Namenode blockPoolID the startup fails
+   * </li>
+   * <li>Check 3: The data-node does regular startup (no matter which options 
    *    it is started with) if
    *       softwareLV == storedLV AND 
    *       DataNode.FSSCTime == NameNode.FSSCTime
-   * 2. The data-node performs an upgrade if it is started without any 
+   * </li>
+   * <li>Check 4: The data-node performs an upgrade if it is started without any 
    *    options and
    *       |softwareLV| > |storedLV| OR 
    *       (softwareLV == storedLV AND
    *        DataNode.FSSCTime < NameNode.FSSCTime)
-   * 3. NOT TESTED: The data-node rolls back if it is started with
+   * </li>
+   * <li>NOT TESTED: The data-node rolls back if it is started with
    *    the -rollback option and
    *       |softwareLV| >= |previous.storedLV| AND 
    *       DataNode.previous.FSSCTime <= NameNode.FSSCTime
-   * 4. In all other cases the startup fails.
+   * </li>
+   * <li>Check 5: In all other cases the startup fails.</li>
+   * </ol>
    * </pre>
    */
-  boolean isVersionCompatible(StorageInfo namenodeVer, StorageInfo datanodeVer) {
+  boolean isVersionCompatible(StorageData namenodeSd, StorageData datanodeSd) {
+    final StorageInfo namenodeVer = namenodeSd.storageInfo;
+    final StorageInfo datanodeVer = datanodeSd.storageInfo;
     // check #0
     if (namenodeVer.getNamespaceID() != datanodeVer.getNamespaceID()) {
       LOG.info("namespaceIDs are not equal: isVersionCompatible=false");
       return false;
     }
     // check #1
+    if (!namenodeVer.getClusterID().equals(datanodeVer.getClusterID())) {
+      LOG.info("clusterIDs are not equal: isVersionCompatible=false");
+      return false;
+    }
+    // check #2
+    if (!namenodeSd.blockPoolId.equals(datanodeSd.blockPoolId)) {
+      LOG.info("blockPoolIDs are not equal: isVersionCompatible=false");
+      return false;
+    }
+    // check #3
     int softwareLV = FSConstants.LAYOUT_VERSION;  // will also be Namenode's LV
     int storedLV = datanodeVer.getLayoutVersion();
     if (softwareLV == storedLV &&  
@@ -137,7 +206,7 @@ public class TestDFSStartupVersions exte
         LOG.info("layoutVersions and cTimes are equal: isVersionCompatible=true");
         return true;
       }
-    // check #2
+    // check #4
     long absSoftwareLV = Math.abs((long)softwareLV);
     long absStoredLV = Math.abs((long)storedLV);
     if (absSoftwareLV > absStoredLV ||
@@ -147,7 +216,7 @@ public class TestDFSStartupVersions exte
         LOG.info("softwareLayoutVersion is newer OR namenode cTime is newer: isVersionCompatible=true");
         return true;
       }
-    // check #4
+    // check #5
     LOG.info("default case: isVersionCompatible=false");
     return false;
   }
@@ -170,25 +239,30 @@ public class TestDFSStartupVersions exte
     UpgradeUtilities.initialize();
     Configuration conf = UpgradeUtilities.initializeStorageStateConf(1, 
                                                       new HdfsConfiguration());
-    StorageInfo[] versions = initializeVersions();
-    UpgradeUtilities.createStorageDirs(
-                                       NAME_NODE, conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY), "current");
+    StorageData[] versions = initializeVersions();
+    UpgradeUtilities.createNameNodeStorageDirs(
+        conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY), "current");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                               .format(false)
                                               .manageDataDfsDirs(false)
                                               .manageNameDfsDirs(false)
                                               .startupOption(StartupOption.REGULAR)
                                               .build();
-    StorageInfo nameNodeVersion = new StorageInfo(
-                                                  UpgradeUtilities.getCurrentLayoutVersion(),
-                                                  UpgradeUtilities.getCurrentNamespaceID(cluster),
-                                                  UpgradeUtilities.getCurrentFsscTime(cluster));
+    StorageData nameNodeVersion = new StorageData(
+        UpgradeUtilities.getCurrentLayoutVersion(),
+        UpgradeUtilities.getCurrentNamespaceID(cluster),
+        UpgradeUtilities.getCurrentClusterID(cluster),
+        UpgradeUtilities.getCurrentFsscTime(cluster),
+        UpgradeUtilities.getCurrentBlockPoolID(cluster));
+    
     log("NameNode version info", NAME_NODE, null, nameNodeVersion);
+    String bpid = UpgradeUtilities.getCurrentBlockPoolID(cluster);
     for (int i = 0; i < versions.length; i++) {
-      File[] storage = UpgradeUtilities.createStorageDirs(
-                                                          DATA_NODE, conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
+      File[] storage = UpgradeUtilities.createDataNodeStorageDirs(
+          conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
       log("DataNode version info", DATA_NODE, i, versions[i]);
-      UpgradeUtilities.createVersionFile(conf, DATA_NODE, storage, versions[i]);
+      UpgradeUtilities.createDataNodeVersionFile(storage,
+          versions[i].storageInfo, bpid, versions[i].blockPoolId);
       try {
         cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
       } catch (Exception ignore) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java Fri Apr 29 18:16:32 2011
@@ -23,7 +23,7 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 
 import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.NAME_NODE;
@@ -102,88 +102,197 @@ public class TestDFSStorageStateRecovery
              + " previous="+state[1]
              + " previous.tmp="+state[2]
              + " removed.tmp="+state[3]
-             + " lastcheckpoint.tmp="+state[4]);
+             + " lastcheckpoint.tmp="+state[4]
+             + " should recover="+state[5]
+             + " current exists after="+state[6]
+             + " previous exists after="+state[7]);
   }
   
   /**
-   * Sets up the storage directories for the given node type, either
-   * dfs.name.dir or dfs.data.dir. For each element in dfs.name.dir or
-   * dfs.data.dir, the subdirectories represented by the first four elements 
-   * of the <code>state</code> array will be created and populated.
-   * See UpgradeUtilities.createStorageDirs().
+   * Sets up the storage directories for namenode as defined by
+   * dfs.name.dir. For each element in dfs.name.dir, the subdirectories 
+   * represented by the first four elements of the <code>state</code> array
+   * will be created and populated.
+   * 
+   * See {@link UpgradeUtilities#createNameNodeStorageDirs()}
    * 
-   * @param nodeType
-   *   the type of node that storage should be created for. Based on this
-   *   parameter either dfs.name.dir or dfs.data.dir is used from the global conf.
    * @param state
    *   a row from the testCases table which indicates which directories
    *   to setup for the node
-   * @return file paths representing either dfs.name.dir or dfs.data.dir
-   *   directories
+   * @return file paths representing namenode storage directories
    */
-  String[] createStorageState(NodeType nodeType, boolean[] state) throws Exception {
-    String[] baseDirs = (nodeType == NAME_NODE ?
-                         conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY) :
-                         conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+  String[] createNameNodeStorageState(boolean[] state) throws Exception {
+    String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
     UpgradeUtilities.createEmptyDirs(baseDirs);
     if (state[0])  // current
-      UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "current");
     if (state[1])  // previous
-      UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "previous");
     if (state[2])  // previous.tmp
-      UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "previous.tmp");
+      UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "previous.tmp");
     if (state[3])  // removed.tmp
-      UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "removed.tmp");
+      UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "removed.tmp");
     if (state[4])  // lastcheckpoint.tmp
-      UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "lastcheckpoint.tmp");
+      UpgradeUtilities.createNameNodeStorageDirs(baseDirs, "lastcheckpoint.tmp");
     return baseDirs;
   }
- 
+  
+  /**
+   * Sets up the storage directories for a datanode under
+   * dfs.data.dir. For each element in dfs.data.dir, the subdirectories 
+   * represented by the first four elements of the <code>state</code> array 
+   * will be created and populated. 
+   * See {@link UpgradeUtilities#createDataNodeStorageDirs()}
+   * 
+   * @param state
+   *   a row from the testCases table which indicates which directories
+   *   to setup for the node
+   * @return file paths representing datanode storage directories
+   */
+  String[] createDataNodeStorageState(boolean[] state) throws Exception {
+    String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+    UpgradeUtilities.createEmptyDirs(baseDirs);
+    if (state[0])  // current
+      UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "current");
+    if (state[1])  // previous
+      UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "previous");
+    if (state[2])  // previous.tmp
+      UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "previous.tmp");
+    if (state[3])  // removed.tmp
+      UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "removed.tmp");
+    if (state[4])  // lastcheckpoint.tmp
+      UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "lastcheckpoint.tmp");
+    return baseDirs;
+  }
+  
+  /**
+   * Sets up the storage directories for a block pool under
+   * dfs.data.dir. For each element in dfs.data.dir, the subdirectories 
+   * represented by the first four elements of the <code>state</code> array 
+   * will be created and populated. 
+   * See {@link UpgradeUtilities#createBlockPoolStorageDirs()}
+   * 
+   * @param bpid block pool Id
+   * @param state
+   *   a row from the testCases table which indicates which directories
+   *   to setup for the node
+   * @return file paths representing block pool storage directories
+   */
+  String[] createBlockPoolStorageState(String bpid, boolean[] state) throws Exception {
+    String[] baseDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+    UpgradeUtilities.createEmptyDirs(baseDirs);
+    UpgradeUtilities.createDataNodeStorageDirs(baseDirs, "current");
+    
+    // After copying the storage directories from master datanode, empty
+    // the block pool storage directories
+    String[] bpDirs = UpgradeUtilities.createEmptyBPDirs(baseDirs, bpid);
+    if (state[0]) // current
+      UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "current", bpid);
+    if (state[1]) // previous
+      UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "previous", bpid);
+    if (state[2]) // previous.tmp
+      UpgradeUtilities.createBlockPoolStorageDirs(baseDirs, "previous.tmp",
+          bpid);
+    if (state[3]) // removed.tmp
+      UpgradeUtilities
+          .createBlockPoolStorageDirs(baseDirs, "removed.tmp", bpid);
+    if (state[4]) // lastcheckpoint.tmp
+      UpgradeUtilities.createBlockPoolStorageDirs(baseDirs,
+          "lastcheckpoint.tmp", bpid);
+    return bpDirs;
+  }
+  
   /**
-   * Verify that the current and/or previous exist as indicated by 
+   * For NameNode, verify that the current and/or previous exist as indicated by 
    * the method parameters.  If previous exists, verify that
    * it hasn't been modified by comparing the checksum of all it's
    * containing files with their original checksum.  It is assumed that
    * the server has recovered.
    */
-  void checkResult(NodeType nodeType, String[] baseDirs, 
+  void checkResultNameNode(String[] baseDirs, 
                    boolean currentShouldExist, boolean previousShouldExist) 
     throws IOException
   {
-    switch (nodeType) {
-    case NAME_NODE:
-      if (currentShouldExist) {
-        for (int i = 0; i < baseDirs.length; i++) {
-          assertTrue(new File(baseDirs[i],"current").isDirectory());
-          assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
-          assertTrue(new File(baseDirs[i],"current/edits").isFile());
-          assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
-          assertTrue(new File(baseDirs[i],"current/fstime").isFile());
-        }
-      }
-      break;
-    case DATA_NODE:
-      if (currentShouldExist) {
-        for (int i = 0; i < baseDirs.length; i++) {
-          assertEquals(
-                       UpgradeUtilities.checksumContents(
-                                                         nodeType, new File(baseDirs[i],"current")),
-                       UpgradeUtilities.checksumMasterContents(nodeType));
-        }
+    if (currentShouldExist) {
+      for (int i = 0; i < baseDirs.length; i++) {
+        assertTrue(new File(baseDirs[i],"current").isDirectory());
+        assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
+        assertTrue(new File(baseDirs[i],"current/edits").isFile());
+        assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
+        assertTrue(new File(baseDirs[i],"current/fstime").isFile());
       }
-      break;
     }
     if (previousShouldExist) {
       for (int i = 0; i < baseDirs.length; i++) {
         assertTrue(new File(baseDirs[i],"previous").isDirectory());
         assertEquals(
                      UpgradeUtilities.checksumContents(
-                                                       nodeType, new File(baseDirs[i],"previous")),
-                     UpgradeUtilities.checksumMasterContents(nodeType));
+                                                       NAME_NODE, new File(baseDirs[i],"previous")),
+                     UpgradeUtilities.checksumMasterNameNodeContents());
+      }
+    }
+  }
+  
+  /**
+   * For datanode, verify that the current and/or previous exist as indicated by 
+   * the method parameters.  If previous exists, verify that
+   * it hasn't been modified by comparing the checksum of all it's
+   * containing files with their original checksum.  It is assumed that
+   * the server has recovered.
+   */
+  void checkResultDataNode(String[] baseDirs, 
+                   boolean currentShouldExist, boolean previousShouldExist) 
+    throws IOException
+  {
+    if (currentShouldExist) {
+      for (int i = 0; i < baseDirs.length; i++) {
+        assertEquals(
+                     UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"current")),
+                     UpgradeUtilities.checksumMasterDataNodeContents());
+      }
+    }
+    if (previousShouldExist) {
+      for (int i = 0; i < baseDirs.length; i++) {
+        assertTrue(new File(baseDirs[i],"previous").isDirectory());
+        assertEquals(
+                     UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"previous")),
+                     UpgradeUtilities.checksumMasterDataNodeContents());
       }
     }
   }
  
+  /**
+   * For block pool, verify that the current and/or previous exist as indicated
+   * by the method parameters.  If previous exists, verify that
+   * it hasn't been modified by comparing the checksum of all it's
+   * containing files with their original checksum.  It is assumed that
+   * the server has recovered.
+   * @param baseDirs directories pointing to block pool storage
+   * @param bpid block pool Id
+   * @param currentShouldExist current directory exists under storage
+   * @param currentShouldExist previous directory exists under storage
+   */
+  void checkResultBlockPool(String[] baseDirs, boolean currentShouldExist,
+      boolean previousShouldExist) throws IOException
+  {
+    if (currentShouldExist) {
+      for (int i = 0; i < baseDirs.length; i++) {
+        File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT);
+        assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir),
+                     UpgradeUtilities.checksumMasterBlockPoolContents());
+      }
+    }
+    if (previousShouldExist) {
+      for (int i = 0; i < baseDirs.length; i++) {
+        File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS);
+        assertTrue(bpPrevDir.isDirectory());
+        assertEquals(
+                     UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir),
+                     UpgradeUtilities.checksumMasterBlockPoolContents());
+      }
+    }
+  }
+  
   private MiniDFSCluster createCluster(Configuration c) throws IOException {
     return new MiniDFSCluster.Builder(c)
                              .numDataNodes(0)
@@ -211,10 +320,10 @@ public class TestDFSStorageStateRecovery
         boolean prevAfterRecover = testCase[7];
 
         log("NAME_NODE recovery", numDirs, i, testCase);
-        baseDirs = createStorageState(NAME_NODE, testCase);
+        baseDirs = createNameNodeStorageState(testCase);
         if (shouldRecover) {
           cluster = createCluster(conf);
-          checkResult(NAME_NODE, baseDirs, curAfterRecover, prevAfterRecover);
+          checkResultNameNode(baseDirs, curAfterRecover, prevAfterRecover);
           cluster.shutdown();
         } else {
           try {
@@ -237,12 +346,13 @@ public class TestDFSStorageStateRecovery
   }
 
   /**
-   * This test iterates over the testCases table and attempts
-   * to startup the DataNode normally.
+   * This test iterates over the testCases table for Datanode storage and
+   * attempts to startup the DataNode normally.
    */
   public void testDNStorageStates() throws Exception {
     String[] baseDirs;
 
+    // First setup the datanode storage directory
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
       conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);      
@@ -254,21 +364,66 @@ public class TestDFSStorageStateRecovery
         boolean prevAfterRecover = testCase[7];
 
         log("DATA_NODE recovery", numDirs, i, testCase);
-        createStorageState(NAME_NODE,
-                           new boolean[] {true, true, false, false, false});
+        createNameNodeStorageState(new boolean[] { true, true, false, false,
+            false });
         cluster = createCluster(conf);
-        baseDirs = createStorageState(DATA_NODE, testCase);
+        baseDirs = createDataNodeStorageState(testCase);
         if (!testCase[0] && !testCase[1] && !testCase[2] && !testCase[3]) {
           // DataNode will create and format current if no directories exist
           cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
         } else {
           if (shouldRecover) {
             cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
-            checkResult(DATA_NODE, baseDirs, curAfterRecover, prevAfterRecover);
+            checkResultDataNode(baseDirs, curAfterRecover, prevAfterRecover);
           } else {
             try {
               cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
-              throw new AssertionError("DataNode should have failed to start");
+              assertFalse(cluster.getDataNodes().get(0).isDatanodeUp());
+            } catch (Exception expected) {
+              // expected
+            }
+          }
+        }
+        cluster.shutdown();
+      } // end testCases loop
+    } // end numDirs loop
+  }
+
+  /**
+   * This test iterates over the testCases table for block pool storage and
+   * attempts to startup the DataNode normally.
+   */
+  public void testBlockPoolStorageStates() throws Exception {
+    String[] baseDirs;
+
+    // First setup the datanode storage directory
+    String bpid = UpgradeUtilities.getCurrentBlockPoolID(null);
+    for (int numDirs = 1; numDirs <= 2; numDirs++) {
+      conf = new HdfsConfiguration();
+      conf.setInt("dfs.datanode.scan.period.hours", -1);      
+      conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
+      for (int i = 0; i < NUM_DN_TEST_CASES; i++) {
+        boolean[] testCase = testCases[i];
+        boolean shouldRecover = testCase[5];
+        boolean curAfterRecover = testCase[6];
+        boolean prevAfterRecover = testCase[7];
+
+        log("BLOCK_POOL recovery", numDirs, i, testCase);
+        createNameNodeStorageState(new boolean[] { true, true, false, false,
+            false });
+        cluster = createCluster(conf);
+        baseDirs = createBlockPoolStorageState(bpid, testCase);
+        if (!testCase[0] && !testCase[1] && !testCase[2] && !testCase[3]) {
+          // DataNode will create and format current if no directories exist
+          cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+        } else {
+          if (shouldRecover) {
+            cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+            checkResultBlockPool(baseDirs, curAfterRecover, prevAfterRecover);
+          } else {
+            try {
+              cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+              assertFalse(cluster.getDataNodes().get(0).isBPServiceAlive(bpid));
             } catch (Exception expected) {
               // expected
             }
@@ -288,13 +443,4 @@ public class TestDFSStorageStateRecovery
     LOG.info("Shutting down MiniDFSCluster");
     if (cluster != null) cluster.shutdown();
   }
-  
-  public static void main(String[] args) throws Exception {
-    TestDFSStorageStateRecovery test = new TestDFSStorageStateRecovery();
-    test.testNNStorageStates();
-    test.testDNStorageStates();
-  }
-  
-}
-
-
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Fri Apr 29 18:16:32 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -61,41 +60,45 @@ public class TestDFSUpgrade {
   }
   
   /**
-   * Verify that the current and previous directories exist.  Verify that 
-   * previous hasn't been modified by comparing the checksum of all it's
-   * containing files with their original checksum.  It is assumed that
-   * the server has recovered and upgraded.
-   */
-  void checkResult(NodeType nodeType, String[] baseDirs) throws IOException {
-    switch (nodeType) {
-    case NAME_NODE:
-      for (int i = 0; i < baseDirs.length; i++) {
-        assertTrue(new File(baseDirs[i],"current").isDirectory());
-        assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
-        assertTrue(new File(baseDirs[i],"current/edits").isFile());
-        assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
-        assertTrue(new File(baseDirs[i],"current/fstime").isFile());
-      }
-      break;
-    case DATA_NODE:
-      for (int i = 0; i < baseDirs.length; i++) {
-        assertEquals(
-                     UpgradeUtilities.checksumContents(
-                                                       nodeType, new File(baseDirs[i],"current")),
-                     UpgradeUtilities.checksumMasterContents(nodeType));
-      }
-      break;
-    }
+   * For namenode, Verify that the current and previous directories exist.
+   * Verify that previous hasn't been modified by comparing the checksum of all
+   * its files with their original checksum. It is assumed that the
+   * server has recovered and upgraded.
+   */
+  void checkNameNode(String[] baseDirs) throws IOException {
     for (int i = 0; i < baseDirs.length; i++) {
-      assertTrue(new File(baseDirs[i],"previous").isDirectory());
-      assertEquals(
-                   UpgradeUtilities.checksumContents(
-                                                     nodeType, new File(baseDirs[i],"previous")),
-                   UpgradeUtilities.checksumMasterContents(nodeType));
+      assertTrue(new File(baseDirs[i],"current").isDirectory());
+      assertTrue(new File(baseDirs[i],"current/VERSION").isFile());
+      assertTrue(new File(baseDirs[i],"current/edits").isFile());
+      assertTrue(new File(baseDirs[i],"current/fsimage").isFile());
+      assertTrue(new File(baseDirs[i],"current/fstime").isFile());
+      
+      File previous = new File(baseDirs[i], "previous");
+      assertTrue(previous.isDirectory());
+      assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous),
+          UpgradeUtilities.checksumMasterNameNodeContents());
     }
   }
  
   /**
+   * For datanode, for a block pool, verify that the current and previous
+   * directories exist. Verify that previous hasn't been modified by comparing
+   * the checksum of all its files with their original checksum. It
+   * is assumed that the server has recovered and upgraded.
+   */
+  void checkDataNode(String[] baseDirs, String bpid) throws IOException {
+    for (int i = 0; i < baseDirs.length; i++) {
+      File current = new File(baseDirs[i], "current/" + bpid + "/current");
+      assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, current),
+        UpgradeUtilities.checksumMasterDataNodeContents());
+      
+      File previous = new File(baseDirs[i], "current/" + bpid + "/previous");
+      assertTrue(previous.isDirectory());
+      assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previous),
+          UpgradeUtilities.checksumMasterDataNodeContents());
+    }
+  }
+  /**
    * Attempts to start a NameNode with the given operation.  Starting
    * the NameNode should throw an exception.
    */
@@ -114,17 +117,16 @@ public class TestDFSUpgrade {
   }
   
   /**
-   * Attempts to start a DataNode with the given operation.  Starting
-   * the DataNode should throw an exception.
+   * Attempts to start a DataNode with the given operation. Starting
+   * the given block pool should fail.
+   * @param operation startup option
+   * @param bpid block pool Id that should fail to start
+   * @throws IOException 
    */
-  void startDataNodeShouldFail(StartupOption operation) {
-    try {
-      cluster.startDataNodes(conf, 1, false, operation, null); // should fail
-      throw new AssertionError("DataNode should have failed to start");
-    } catch (Exception expected) {
-      // expected
-      assertFalse(cluster.isDataNodeUp());
-    }
+  void startBlockPoolShouldFail(StartupOption operation, String bpid) throws IOException {
+    cluster.startDataNodes(conf, 1, false, operation, null); // should fail
+    assertFalse("Block pool " + bpid + " should have failed to start",
+        cluster.getDataNodes().get(0).isBPServiceAlive(bpid));
   }
  
   /**
@@ -149,6 +151,7 @@ public class TestDFSUpgrade {
     File[] baseDirs;
     UpgradeUtilities.initialize();
     
+    StorageInfo storageInfo = null;
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
       conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);      
@@ -157,67 +160,76 @@ public class TestDFSUpgrade {
       String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
       
       log("Normal NameNode upgrade", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = createCluster();
-      checkResult(NAME_NODE, nameNodeDirs);
+      checkNameNode(nameNodeDirs);
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("Normal DataNode upgrade", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = createCluster();
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
       cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
-      checkResult(DATA_NODE, dataNodeDirs);
+      checkDataNode(dataNodeDirs, UpgradeUtilities.getCurrentBlockPoolID(null));
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
       
       log("NameNode upgrade with existing previous dir", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       startNameNodeShouldFail(StartupOption.UPGRADE);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("DataNode upgrade with existing previous dir", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = createCluster();
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
       cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
-      checkResult(DATA_NODE, dataNodeDirs);
+      checkDataNode(dataNodeDirs, UpgradeUtilities.getCurrentBlockPoolID(null));
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
 
       log("DataNode upgrade with future stored layout version in current", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = createCluster();
-      baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
-                                         new StorageInfo(Integer.MIN_VALUE,
-                                                         UpgradeUtilities.getCurrentNamespaceID(cluster),
-                                                         UpgradeUtilities.getCurrentFsscTime(cluster)));
-      startDataNodeShouldFail(StartupOption.REGULAR);
+      baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      storageInfo = new StorageInfo(Integer.MIN_VALUE, 
+          UpgradeUtilities.getCurrentNamespaceID(cluster),
+          UpgradeUtilities.getCurrentClusterID(cluster),
+          UpgradeUtilities.getCurrentFsscTime(cluster));
+      
+      UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+      
+      startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
+          .getCurrentBlockPoolID(null));
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
       
       log("DataNode upgrade with newer fsscTime in current", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = createCluster();
-      baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
-                                         new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
-                                                         UpgradeUtilities.getCurrentNamespaceID(cluster),
-                                                         Long.MAX_VALUE));
-      startDataNodeShouldFail(StartupOption.REGULAR);
+      baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      storageInfo = new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(), 
+          UpgradeUtilities.getCurrentNamespaceID(cluster),
+          UpgradeUtilities.getCurrentClusterID(cluster), Long.MAX_VALUE);
+          
+      UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, 
+          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+      // Ensure corresponding block pool failed to initialized
+      startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
+          .getCurrentBlockPoolID(null));
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
 
       log("NameNode upgrade with no edits file", numDirs);
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       for (File f : baseDirs) { 
         FileUtil.fullyDelete(new File(f,"edits"));
       }
@@ -225,7 +237,7 @@ public class TestDFSUpgrade {
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode upgrade with no image file", numDirs);
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       for (File f : baseDirs) { 
         FileUtil.fullyDelete(new File(f,"fsimage")); 
       }
@@ -233,7 +245,7 @@ public class TestDFSUpgrade {
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode upgrade with corrupt version file", numDirs);
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       for (File f : baseDirs) { 
         UpgradeUtilities.corruptFile(new File(f,"VERSION")); 
       }
@@ -241,20 +253,28 @@ public class TestDFSUpgrade {
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode upgrade with old layout version in current", numDirs);
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
-                                         new StorageInfo(Storage.LAST_UPGRADABLE_LAYOUT_VERSION + 1,
-                                                         UpgradeUtilities.getCurrentNamespaceID(null),
-                                                         UpgradeUtilities.getCurrentFsscTime(null)));
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      storageInfo = new StorageInfo(Storage.LAST_UPGRADABLE_LAYOUT_VERSION + 1, 
+          UpgradeUtilities.getCurrentNamespaceID(null),
+          UpgradeUtilities.getCurrentClusterID(null),
+          UpgradeUtilities.getCurrentFsscTime(null));
+      
+      UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs, storageInfo,
+          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+      
       startNameNodeShouldFail(StartupOption.UPGRADE);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode upgrade with future layout version in current", numDirs);
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
-                                         new StorageInfo(Integer.MIN_VALUE,
-                                                         UpgradeUtilities.getCurrentNamespaceID(null),
-                                                         UpgradeUtilities.getCurrentFsscTime(null)));
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      storageInfo = new StorageInfo(Integer.MIN_VALUE, 
+          UpgradeUtilities.getCurrentNamespaceID(null),
+          UpgradeUtilities.getCurrentClusterID(null),
+          UpgradeUtilities.getCurrentFsscTime(null));
+      
+      UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs, storageInfo,
+          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+      
       startNameNodeShouldFail(StartupOption.UPGRADE);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
     } // end numDir loop

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java Fri Apr 29 18:16:32 2011
@@ -189,6 +189,7 @@ public class TestDFSUpgradeFromImage ext
                                   .numDataNodes(numDataNodes)
                                   .format(false)
                                   .startupOption(StartupOption.UPGRADE)
+                                  .clusterId("testClusterId")
                                   .build();
       cluster.waitActive();
       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUtil.java Fri Apr 29 18:16:32 2011
@@ -28,7 +28,7 @@ import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.fs.BlockLocation;
 
@@ -43,11 +43,11 @@ public class TestDFSUtil {
     ds[0] = d;
 
     // ok
-    Block b1 = new Block(1, 1, 1);
+    ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 1, 1);
     LocatedBlock l1 = new LocatedBlock(b1, ds, 0, false);
 
     // corrupt
-    Block b2 = new Block(2, 1, 1);
+    ExtendedBlock b2 = new ExtendedBlock("bpid", 2, 1, 1);
     LocatedBlock l2 = new LocatedBlock(b2, ds, 0, true);
 
     List<LocatedBlock> ls = Arrays.asList(l1, l2);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Apr 29 18:16:32 2011
@@ -45,14 +45,15 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -146,7 +147,7 @@ public class TestDataTransferProtocol ex
     in.readFully(arr);
   }
   
-  private void writeZeroLengthPacket(Block block, String description)
+  private void writeZeroLengthPacket(ExtendedBlock block, String description)
   throws IOException {
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
@@ -167,12 +168,12 @@ public class TestDataTransferProtocol ex
     sendRecvData(description, false);
   }
   
-  private void testWrite(Block block, BlockConstructionStage stage, long newGS,
+  private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS,
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
     recvBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0, stage, newGS,
-        block.getNumBytes(), block.getNumBytes(), "cl", null,
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
+        stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
       ERROR.write(recvOut);
@@ -194,7 +195,9 @@ public class TestDataTransferProtocol ex
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
     try {
       cluster.waitActive();
-      datanode = cluster.getDataNodes().get(0).dnRegistration;
+      String poolId = cluster.getNamesystem().getBlockPoolId(); 
+      datanode = DataNodeTestUtils.getDNRegistrationForBP(
+          cluster.getDataNodes().get(0), poolId);
       dnAddr = NetUtils.createSocketAddr(datanode.getName());
       FileSystem fileSys = cluster.getFileSystem();
 
@@ -202,7 +205,7 @@ public class TestDataTransferProtocol ex
       Path file = new Path("dataprotocol.dat");    
       DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
       // get the first blockid for the file
-      Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
       // test PIPELINE_SETUP_CREATE on a finalized block
       testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
           "Cannot create an existing block", true);
@@ -240,8 +243,8 @@ public class TestDataTransferProtocol ex
 
       /* Test writing to a new block */
       long newBlockId = firstBlock.getBlockId() + 1;
-      Block newBlock = new Block(newBlockId, 0, 
-          firstBlock.getGenerationStamp());
+      ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(),
+          newBlockId, 0, firstBlock.getGenerationStamp());
 
       // test PIPELINE_SETUP_CREATE on a new block
       testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
@@ -337,7 +340,8 @@ public class TestDataTransferProtocol ex
     createFile(fileSys, file, fileLen);
 
     // get the first blockid for the file
-    Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+    final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+    final String poolId = firstBlock.getBlockPoolId();
     long newBlockId = firstBlock.getBlockId() + 1;
 
     recvBuf.reset();
@@ -357,7 +361,7 @@ public class TestDataTransferProtocol ex
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
-        new Block(newBlockId), 0,
+        new ExtendedBlock(poolId, newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -371,7 +375,7 @@ public class TestDataTransferProtocol ex
     sendBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
-        new Block(++newBlockId), 0,
+        new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -395,7 +399,7 @@ public class TestDataTransferProtocol ex
     sendBuf.reset();
     recvBuf.reset();
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
-        new Block(++newBlockId), 0,
+        new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -418,7 +422,8 @@ public class TestDataTransferProtocol ex
     
     /* Test OP_READ_BLOCK */
 
-    Block blk = new Block(firstBlock);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
     long blkid = blk.getBlockId();
     // bad block id
     sendBuf.reset();

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Fri Apr 29 18:16:32 2011
@@ -30,8 +30,8 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -65,11 +65,11 @@ public class TestDatanodeBlockScanner ex
    * @throws IOException
    * @throws TimeoutException
    */
-  private static long waitForVerification(DatanodeInfo dn, FileSystem fs, 
+  private static long waitForVerification(int infoPort, FileSystem fs, 
                           Path file, int blocksValidated, 
                           long newTime, long timeout) 
   throws IOException, TimeoutException {
-    URL url = new URL("http://localhost:" + dn.getInfoPort() +
+    URL url = new URL("http://localhost:" + infoPort +
                       "/blockScannerReport?listblocks");
     long lastWarnTime = System.currentTimeMillis();
     if (newTime <= 0) newTime = 1L;
@@ -146,7 +146,8 @@ public class TestDatanodeBlockScanner ex
     /*
      * The cluster restarted. The block should be verified by now.
      */
-    assertTrue(waitForVerification(dn, fs, file1, 1, startTime, TIMEOUT) >= startTime);
+    assertTrue(waitForVerification(dn.getInfoPort(), fs, file1, 1, startTime,
+        TIMEOUT) >= startTime);
     
     /*
      * Create a new file and read the block. The block should be marked 
@@ -155,12 +156,17 @@ public class TestDatanodeBlockScanner ex
     DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
     IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), 
                       conf, true); 
-    assertTrue(waitForVerification(dn, fs, file2, 2, startTime, TIMEOUT) >= startTime);
+    assertTrue(waitForVerification(dn.getInfoPort(), fs, file2, 2, startTime,
+        TIMEOUT) >= startTime);
     
     cluster.shutdown();
   }
 
-  public void testBlockCorruptionPolicy() throws Exception {
+  public static boolean corruptReplica(ExtendedBlock blk, int replica) throws IOException {
+    return MiniDFSCluster.corruptReplica(replica, blk);
+  }
+
+  public void testBlockCorruptionPolicy() throws IOException {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
     Random random = new Random();
@@ -172,13 +178,13 @@ public class TestDatanodeBlockScanner ex
     fs = cluster.getFileSystem();
     Path file1 = new Path("/tmp/testBlockVerification/file1");
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
-    String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
     
     DFSTestUtil.waitReplication(fs, file1, (short)3);
     assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
 
     // Corrupt random replica of block 
-    assertTrue(cluster.corruptReplica(block, rand));
+    assertTrue(MiniDFSCluster.corruptReplica(rand, block));
 
     // Restart the datanode hoping the corrupt block to be reported
     cluster.restartDataNode(rand);
@@ -189,9 +195,9 @@ public class TestDatanodeBlockScanner ex
   
     // Corrupt all replicas. Now, block should be marked as corrupt
     // and we should get all the replicas 
-    assertTrue(cluster.corruptReplica(block, 0));
-    assertTrue(cluster.corruptReplica(block, 1));
-    assertTrue(cluster.corruptReplica(block, 2));
+    assertTrue(MiniDFSCluster.corruptReplica(0, block));
+    assertTrue(MiniDFSCluster.corruptReplica(1, block));
+    assertTrue(MiniDFSCluster.corruptReplica(2, block));
 
     // Read the file to trigger reportBadBlocks by client
     try {
@@ -251,8 +257,7 @@ public class TestDatanodeBlockScanner ex
     FileSystem fs = cluster.getFileSystem();
     Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
     DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
-    Block blk = DFSTestUtil.getFirstBlock(fs, file1);
-    String block = blk.getBlockName();
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
 
     // Wait until block is replicated to numReplicas
     DFSTestUtil.waitReplication(fs, file1, numReplicas);
@@ -260,7 +265,7 @@ public class TestDatanodeBlockScanner ex
     // Corrupt numCorruptReplicas replicas of block 
     int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
     for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
-      if (cluster.corruptReplica(block, i)) {
+      if (corruptReplica(block, i)) {
         corruptReplicasDNIDs[j++] = i;
         LOG.info("successfully corrupted block " + block + " on node " 
                  + i + " " + cluster.getDataNodes().get(i).getSelfAddr());
@@ -281,7 +286,7 @@ public class TestDatanodeBlockScanner ex
 
     // Loop until all corrupt replicas are reported
     DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-        blk, numCorruptReplicas);
+        block, numCorruptReplicas);
     
     // Loop until the block recovers after replication
     DFSTestUtil.waitReplication(fs, file1, numReplicas);
@@ -290,7 +295,7 @@ public class TestDatanodeBlockScanner ex
     // Make sure the corrupt replica is invalidated and removed from
     // corruptReplicasMap
     DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-        blk, 0);
+        block, 0);
     cluster.shutdown();
   }
   
@@ -299,7 +304,6 @@ public class TestDatanodeBlockScanner ex
     final Configuration conf = new HdfsConfiguration();
     final short REPLICATION_FACTOR = (short)2;
     final Path fileName = new Path("/file1");
-    String block; //block file name
 
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
@@ -312,11 +316,12 @@ public class TestDatanodeBlockScanner ex
                                                .build();
     cluster.waitActive();
     
+    ExtendedBlock block;
     try {
       FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
       DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
-      block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
+      block = DFSTestUtil.getFirstBlock(fs, fileName);
     } finally {
       cluster.shutdown();
     }
@@ -330,8 +335,8 @@ public class TestDatanodeBlockScanner ex
     cluster.waitActive();
     try {
       FileSystem fs = cluster.getFileSystem();
-      DatanodeInfo dn = new DatanodeInfo(cluster.getDataNodes().get(0).dnRegistration);
-      assertTrue(waitForVerification(dn, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
+      int infoPort = cluster.getDataNodes().get(0).getInfoPort();
+      assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
       
       // Truncate replica of block
       if (!changeReplicaLength(block, 0, -1)) {
@@ -373,43 +378,31 @@ public class TestDatanodeBlockScanner ex
   /**
    * Change the length of a block at datanode dnIndex
    */
-  static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
-    File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
-    for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1) + 
-          MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
-      if (blockFile.exists()) {
-        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        long origLen = raFile.length();
-        raFile.setLength(origLen + lenDelta);
-        raFile.close();
-        LOG.info("assigned length " + (origLen + lenDelta) 
-            + " to block file " + blockFile.getPath()
-            + " on datanode " + dnIndex);
-        return true;
-      }
+  static boolean changeReplicaLength(ExtendedBlock blk, int dnIndex,
+      int lenDelta) throws IOException {
+    File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
+    if (blockFile != null && blockFile.exists()) {
+      RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+      raFile.setLength(raFile.length()+lenDelta);
+      raFile.close();
+      return true;
     }
-    LOG.info("failed to change length of block " + blockName);
+    LOG.info("failed to change length of block " + blk);
     return false;
   }
   
-  private static void waitForBlockDeleted(String blockName, int dnIndex,
-      long timeout) 
-  throws IOException, TimeoutException, InterruptedException {
-    File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
-    File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) + 
-        MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
-    File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) + 
-        MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
+  private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
+      long timeout) throws IOException, TimeoutException, InterruptedException {
+    File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
     long failtime = System.currentTimeMillis() 
                     + ((timeout > 0) ? timeout : Long.MAX_VALUE);
-    while (blockFile1.exists() || blockFile2.exists()) {
+    while (blockFile != null && blockFile.exists()) {
       if (failtime < System.currentTimeMillis()) {
         throw new TimeoutException("waited too long for blocks to be deleted: "
-            + blockFile1.getPath() + (blockFile1.exists() ? " still exists; " : " is absent; ")
-            + blockFile2.getPath() + (blockFile2.exists() ? " still exists." : " is absent."));
+            + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
       }
       Thread.sleep(100);
+      blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
     }
   }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeConfig.java Fri Apr 29 18:16:32 2011
@@ -72,7 +72,7 @@ public class TestDatanodeConfig {
   @Test
   public void testDataDirectories() throws IOException {
     File dataDir = new File(BASE_DIR, "data").getCanonicalFile();
-    Configuration conf = cluster.getConfiguration();
+    Configuration conf = cluster.getConfiguration(0);
     // 1. Test unsupported schema. Only "file:" is supported.
     String dnDir = makeURI("shv", null, fileAsURI(dataDir).getPath());
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dnDir);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java Fri Apr 29 18:16:32 2011
@@ -17,14 +17,11 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.fs.Path;
 import junit.framework.TestCase;
 
 /**
@@ -41,11 +38,8 @@ public class TestDatanodeRegistration ex
   public void testChangeIpcPort() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
-    FileSystem fs = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).build();
-      fs = cluster.getFileSystem();
-
       InetSocketAddress addr = new InetSocketAddress(
         "localhost",
         cluster.getNameNodePort());
@@ -71,8 +65,7 @@ public class TestDatanodeRegistration ex
         fail("Never got a heartbeat from restarted datanode.");
       }
 
-      int realIpcPort = cluster.getDataNodes().get(0)
-        .dnRegistration.getIpcPort();
+      int realIpcPort = cluster.getDataNodes().get(0).getIpcPort();
       // Now make sure the reported IPC port is the correct one.
       assertEquals(realIpcPort, report[0].getIpcPort());
     } finally {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java Fri Apr 29 18:16:32 2011
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -27,7 +26,6 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -117,22 +115,9 @@ public class TestDecommission {
     rand.nextBytes(buffer);
     stm.write(buffer);
     stm.close();
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
   }
   
-  private void printFileLocations(FileSystem fileSys, Path name)
-  throws IOException {
-    BlockLocation[] locations = fileSys.getFileBlockLocations(
-        fileSys.getFileStatus(name), 0, fileSize);
-    for (int idx = 0; idx < locations.length; idx++) {
-      String[] loc = locations[idx].getHosts();
-      StringBuilder buf = new StringBuilder("Block[" + idx + "] : ");
-      for (int j = 0; j < loc.length; j++) {
-        buf.append(loc[j] + " ");
-      }
-      LOG.info(buf.toString());
-    }
-  }
-
   /**
    * For blocks that reside on the nodes that are down, verify that their
    * replication factor is 1 more than the specified one.
@@ -170,7 +155,7 @@ public class TestDecommission {
       }
       LOG.info("Block " + blk.getBlock() + " has " + hasdown
           + " decommissioned replica.");
-      assertEquals("Number of replicas for block" + blk.getBlock(),
+      assertEquals("Number of replicas for block " + blk.getBlock(),
                    Math.min(numDatanodes, repl+hasdown), nodes.length);  
     }
   }
@@ -181,22 +166,15 @@ public class TestDecommission {
     assertTrue(!fileSys.exists(name));
   }
 
-  private void printDatanodeReport(DatanodeInfo[] info) {
-    LOG.info("-------------------------------------------------");
-    for (int i = 0; i < info.length; i++) {
-      LOG.info(info[i].getDatanodeReport());
-      LOG.info("");
-    }
-  }
-
   /*
-   * decommission one random node.
+   * decommission one random node and wait for each to reach the
+   * given {@code waitForState}.
    */
-  private DatanodeInfo decommissionNode(FSNamesystem namesystem,
-                                  ArrayList<String>decommissionedNodes,
+  private DatanodeInfo decommissionNode(int nnIndex,
+                                  ArrayList<DatanodeInfo>decommissionedNodes,
                                   AdminStates waitForState)
     throws IOException {
-    DFSClient client = getDfsClient(cluster, conf);
+    DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf);
     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
 
     //
@@ -214,11 +192,16 @@ public class TestDecommission {
     LOG.info("Decommissioning node: " + nodename);
 
     // write nodename into the exclude file.
-    ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
+    ArrayList<String> nodes = new ArrayList<String>();
+    if (decommissionedNodes != null) {
+      for (DatanodeInfo dn : decommissionedNodes) {
+        nodes.add(dn.getName());
+      }
+    }
     nodes.add(nodename);
     writeConfigFile(excludeFile, nodes);
-    namesystem.refreshNodes(conf);
-    DatanodeInfo ret = namesystem.getDatanode(info[index]);
+    cluster.getNamesystem(nnIndex).refreshNodes(conf);
+    DatanodeInfo ret = cluster.getNamesystem(nnIndex).getDatanode(info[index]);
     waitNodeState(ret, waitForState);
     return ret;
   }
@@ -239,14 +222,13 @@ public class TestDecommission {
       }
       done = state == node.getAdminState();
     }
+    LOG.info("node " + node + " reached the state " + state);
   }
   
   /* Get DFSClient to the namenode */
-  private static DFSClient getDfsClient(MiniDFSCluster cluster,
+  private static DFSClient getDfsClient(NameNode nn,
       Configuration conf) throws IOException {
-    InetSocketAddress addr = new InetSocketAddress("localhost", 
-                                                   cluster.getNameNodePort());
-    return new DFSClient(addr, conf);
+    return new DFSClient(nn.getNameNodeAddress(), conf);
   }
   
   /* Validate cluster has expected number of datanodes */
@@ -258,13 +240,15 @@ public class TestDecommission {
   
   /** Start a MiniDFSCluster 
    * @throws IOException */
-  private void startCluster(int numDatanodes, Configuration conf)
-      throws IOException {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
-        .build();
+  private void startCluster(int numNameNodes, int numDatanodes,
+      Configuration conf) throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
+        .numDataNodes(numDatanodes).build();
     cluster.waitActive();
-    DFSClient client = getDfsClient(cluster, conf);
-    validateCluster(client, numDatanodes);
+    for (int i = 0; i < numNameNodes; i++) {
+      DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
+      validateCluster(client, numDatanodes);
+    }
   }
   
   private void verifyStats(NameNode namenode, FSNamesystem fsn,
@@ -293,90 +277,134 @@ public class TestDecommission {
   }
 
   /**
-   * Tests Decommission in DFS.
+   * Tests decommission for non federated cluster
    */
   @Test
   public void testDecommission() throws IOException {
+    testDecommission(1, 6);
+  }
+  
+  /**
+   * Test decommission for federeated cluster
+   */
+  @Test
+  public void testDecommissionFederation() throws IOException {
+    testDecommission(2, 2);
+  }
+  
+  private void testDecommission(int numNamenodes, int numDatanodes)
+      throws IOException {
     LOG.info("Starting test testDecommission");
-    int numDatanodes = 6;
-    startCluster(numDatanodes, conf);
-    try {
-      DFSClient client = getDfsClient(cluster, conf);
-      FileSystem fileSys = cluster.getFileSystem();
-      FSNamesystem fsn = cluster.getNamesystem();
-      ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
-      for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
-        int replicas = numDatanodes - iteration - 1;
+    startCluster(numNamenodes, numDatanodes, conf);
+    
+    ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = 
+      new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
+    for(int i = 0; i < numNamenodes; i++) {
+      namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
+    }
+    Path file1 = new Path("testDecommission.dat");
+    for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
+      int replicas = numDatanodes - iteration - 1;
+      
+      // Start decommissioning one namenode at a time
+      for (int i = 0; i < numNamenodes; i++) {
+        ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
+        FileSystem fileSys = cluster.getFileSystem(i);
+        writeFile(fileSys, file1, replicas);
         
         // Decommission one node. Verify that node is decommissioned.
-        Path file1 = new Path("testDecommission.dat");
-        writeFile(fileSys, file1, replicas);
-        LOG.info("Created file decommission.dat with " + replicas
-            + " replicas.");
-        printFileLocations(fileSys, file1);
-        DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
+        DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
             AdminStates.DECOMMISSIONED);
-        decommissionedNodes.add(downnode.getName());
+        decommissionedNodes.add(decomNode);
         
         // Ensure decommissioned datanode is not automatically shutdown
+        DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
         assertEquals("All datanodes must be alive", numDatanodes, 
             client.datanodeReport(DatanodeReportType.LIVE).length);
-        
-        checkFile(fileSys, file1, replicas, downnode.getName(), numDatanodes);
+        checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes);
         cleanupFile(fileSys, file1);
       }
-      
-      // Restart the cluster and ensure decommissioned datanodes
-      // are allowed to register with the namenode
-      cluster.shutdown();
-      startCluster(numDatanodes, conf);
-    } catch (IOException e) {
-      DFSClient client = getDfsClient(cluster, conf);
-      DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
-      printDatanodeReport(info);
-      throw e;
     }
+    
+    // Restart the cluster and ensure decommissioned datanodes
+    // are allowed to register with the namenode
+    cluster.shutdown();
+    startCluster(numNamenodes, numDatanodes, conf);
+  }
+  
+  /**
+   * Tests cluster storage statistics during decommissioning for non
+   * federated cluster
+   */
+  @Test
+  public void testClusterStats() throws Exception {
+    testClusterStats(1);
   }
   
   /**
-   * Tests cluster storage statistics during decommissioning
+   * Tests cluster storage statistics during decommissioning for
+   * federated cluster
    */
   @Test
-  public void testClusterStats() throws IOException, InterruptedException {
+  public void testClusterStatsFederation() throws Exception {
+    testClusterStats(3);
+  }
+  
+  public void testClusterStats(int numNameNodes) throws IOException,
+      InterruptedException {
     LOG.info("Starting test testClusterStats");
     int numDatanodes = 1;
-    startCluster(numDatanodes, conf);
-    
-    FileSystem fileSys = cluster.getFileSystem();
-    Path file = new Path("testClusterStats.dat");
-    writeFile(fileSys, file, 1);
+    startCluster(numNameNodes, numDatanodes, conf);
     
-    FSNamesystem fsn = cluster.getNamesystem();
-    NameNode namenode = cluster.getNameNode();
-    ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
-    DatanodeInfo downnode = decommissionNode(fsn, decommissionedNodes,
-        AdminStates.DECOMMISSION_INPROGRESS);
-    // Check namenode stats for multiple datanode heartbeats
-    verifyStats(namenode, fsn, downnode, true);
-    
-    // Stop decommissioning and verify stats
-    writeConfigFile(excludeFile, null);
-    fsn.refreshNodes(conf);
-    DatanodeInfo ret = fsn.getDatanode(downnode);
-    waitNodeState(ret, AdminStates.NORMAL);
-    verifyStats(namenode, fsn, ret, false);
+    for (int i = 0; i < numNameNodes; i++) {
+      FileSystem fileSys = cluster.getFileSystem(i);
+      Path file = new Path("testClusterStats.dat");
+      writeFile(fileSys, file, 1);
+      
+      FSNamesystem fsn = cluster.getNamesystem(i);
+      NameNode namenode = cluster.getNameNode(i);
+      DatanodeInfo downnode = decommissionNode(i, null,
+          AdminStates.DECOMMISSION_INPROGRESS);
+      // Check namenode stats for multiple datanode heartbeats
+      verifyStats(namenode, fsn, downnode, true);
+      
+      // Stop decommissioning and verify stats
+      writeConfigFile(excludeFile, null);
+      fsn.refreshNodes(conf);
+      DatanodeInfo ret = fsn.getDatanode(downnode);
+      waitNodeState(ret, AdminStates.NORMAL);
+      verifyStats(namenode, fsn, ret, false);
+    }
   }
   
   /**
-   * Test host file or include file functionality. Only datanodes
-   * in the include file are allowed to connect to the namenode.
+   * Test host/include file functionality. Only datanodes
+   * in the include file are allowed to connect to the namenode in a non
+   * federated cluster.
    */
   @Test
   public void testHostsFile() throws IOException, InterruptedException {
+    // Test for a single namenode cluster
+    testHostsFile(1);
+  }
+  
+  /**
+   * Test host/include file functionality. Only datanodes
+   * in the include file are allowed to connect to the namenode in a 
+   * federated cluster.
+   */
+  @Test
+  public void testHostsFileFederation() throws IOException, InterruptedException {
+    // Test for 3 namenode federated cluster
+    testHostsFile(3);
+  }
+  
+  public void testHostsFile(int numNameNodes) throws IOException,
+      InterruptedException {
     conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
     int numDatanodes = 1;
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
-        .setupHostsFile(true).build();
+    cluster = new MiniDFSCluster.Builder(conf).numNameNodes(numNameNodes)
+        .numDataNodes(numDatanodes).setupHostsFile(true).build();
     cluster.waitActive();
     
     // Now empty hosts file and ensure the datanode is disallowed
@@ -384,15 +412,18 @@ public class TestDecommission {
     ArrayList<String>list = new ArrayList<String>();
     list.add("invalidhost");
     writeConfigFile(hostsFile, list);
-    cluster.getNamesystem().refreshNodes(conf);
     
-    DFSClient client = getDfsClient(cluster, conf);
-    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
-    for (int i = 0 ; i < 5 && info.length != 0; i++) {
-      LOG.info("Waiting for datanode to be marked dead");
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
-      info = client.datanodeReport(DatanodeReportType.LIVE);
+    for (int j = 0; j < numNameNodes; j++) {
+      cluster.getNamesystem(j).refreshNodes(conf);
+      
+      DFSClient client = getDfsClient(cluster.getNameNode(j), conf);
+      DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+      for (int i = 0 ; i < 5 && info.length != 0; i++) {
+        LOG.info("Waiting for datanode to be marked dead");
+        Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+        info = client.datanodeReport(DatanodeReportType.LIVE);
+      }
+      assertEquals("Number of live nodes should be 0", 0, info.length);
     }
-    assertEquals("Number of live nodes should be 0", 0, info.length);
   }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Apr 29 18:16:32 2011
@@ -426,7 +426,7 @@ public class TestDistributedFileSystem {
         hdfs.setPermission(new Path(dir), new FsPermission((short)0));
         try {
           final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
-          final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+          final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, 0, "somegroup");
           hftp2.getFileChecksum(qualified);
           fail();
         } catch(IOException ioe) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Fri Apr 29 18:16:32 2011
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.HardLink;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -138,8 +138,8 @@ public class TestFileAppend{
       // Create hard links for a few of the blocks
       //
       for (int i = 0; i < blocks.size(); i = i + 2) {
-        Block b = blocks.get(i).getBlock();
-        File f = dataset.getFile(b);
+        ExtendedBlock b = blocks.get(i).getBlock();
+        File f = dataset.getFile(b.getBlockPoolId(), b.getLocalBlock());
         File link = new File(f.toString() + ".link");
         System.out.println("Creating hardlink for File " + f + " to " + link);
         HardLink.createHardLink(f, link);
@@ -149,7 +149,7 @@ public class TestFileAppend{
       // Detach all blocks. This should remove hardlinks (if any)
       //
       for (int i = 0; i < blocks.size(); i++) {
-        Block b = blocks.get(i).getBlock();
+        ExtendedBlock b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned true",
             dataset.unlinkBlock(b, 1));
@@ -159,7 +159,7 @@ public class TestFileAppend{
       // return false
       //
       for (int i = 0; i < blocks.size(); i++) {
-        Block b = blocks.get(i).getBlock();
+        ExtendedBlock b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned false",
             !dataset.unlinkBlock(b, 1));

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Fri Apr 29 18:16:32 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -191,7 +192,7 @@ public class TestFileAppend3 extends jun
     final LocatedBlocks locatedblocks = fs.dfs.getNamenode().getBlockLocations(p.toString(), 0L, len1);
     assertEquals(1, locatedblocks.locatedBlockCount());
     final LocatedBlock lb = locatedblocks.get(0);
-    final Block blk = lb.getBlock();
+    final ExtendedBlock blk = lb.getBlock();
     assertEquals(len1, lb.getBlockSize());
 
     DatanodeInfo[] datanodeinfos = lb.getLocations();
@@ -260,14 +261,15 @@ public class TestFileAppend3 extends jun
     final int numblock = locatedblocks.locatedBlockCount();
     for(int i = 0; i < numblock; i++) {
       final LocatedBlock lb = locatedblocks.get(i);
-      final Block blk = lb.getBlock();
+      final ExtendedBlock blk = lb.getBlock();
       final long size = lb.getBlockSize();
       if (i < numblock - 1) {
         assertEquals(BLOCK_SIZE, size);
       }
       for(DatanodeInfo datanodeinfo : lb.getLocations()) {
         final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
-        final Block metainfo = dn.data.getStoredBlock(blk.getBlockId());
+        final Block metainfo = dn.data.getStoredBlock(blk.getBlockPoolId(), 
+            blk.getBlockId());
         assertEquals(size, metainfo.getNumBytes());
       }
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java Fri Apr 29 18:16:32 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -158,7 +159,7 @@ public class TestFileAppend4 {
       // Delay completeFile
       DelayAnswer delayer = new DelayAnswer();
       doAnswer(delayer).when(spyNN).complete(
-          anyString(), anyString(), (Block)anyObject());
+          anyString(), anyString(), (ExtendedBlock)anyObject());
  
       DFSClient client = new DFSClient(null, spyNN, conf, null);
       file1 = new Path("/testRecoverFinalized");
@@ -228,7 +229,8 @@ public class TestFileAppend4 {
  
       // Delay completeFile
       DelayAnswer delayer = new DelayAnswer();
-      doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), (Block)anyObject());
+      doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
+          (ExtendedBlock) anyObject());
  
       DFSClient client = new DFSClient(null, spyNN, conf, null);
       file1 = new Path("/testCompleteOtherLease");



Mime
View raw message