hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1099687 [6/15] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/pro...
Date Thu, 05 May 2011 05:40:13 GMT
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java Thu May  5 05:40:07 2011
@@ -50,11 +50,11 @@ public interface DataNodeMXBean {
   public String getHttpPort();
   
   /**
-   * Gets the namenode IP address.
+   * Gets the namenode IP addresses
    * 
-   * @return the namenode IP address
+   * @return the namenode IP addresses that the datanode is talking to
    */
-  public String getNamenodeAddress();
+  public String getNamenodeAddresses();
   
   /**
    * Gets the information of each volume on the Datanode. Please
@@ -63,4 +63,12 @@ public interface DataNodeMXBean {
    * @return the volume info
    */
   public String getVolumeInfo();
+  
+  /**
+   * Gets the cluster id.
+   * 
+   * @return the cluster id
+   */
+  public String getClusterId();
+  
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Thu May  5 05:40:07 2011
@@ -26,13 +26,22 @@ import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
@@ -43,6 +52,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
 
 /** 
  * Data storage information file.
@@ -55,20 +65,32 @@ public class DataStorage extends Storage
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
   final static String COPY_FILE_PREFIX = "dncp_";
-  final static String STORAGE_DIR_RBW = "rbw";
-  final static String STORAGE_DIR_FINALIZED = "finalized";
   final static String STORAGE_DIR_DETACHED = "detach";
+  public final static String STORAGE_DIR_RBW = "rbw";
+  public final static String STORAGE_DIR_FINALIZED = "finalized";
+  public final static String STORAGE_DIR_TMP = "tmp";
+
+  private static final Pattern PRE_GENSTAMP_META_FILE_PATTERN = 
+    Pattern.compile("(.*blk_[-]*\\d+)\\.meta$");
   
+  /** Access to this variable is guarded by "this" */
   private String storageID;
 
+  // flag to ensure initialzing storage occurs only once
+  private boolean initilized = false;
+  
+  // BlockPoolStorage is map of <Block pool Id, BlockPoolStorage>
+  private Map<String, BlockPoolSliceStorage> bpStorageMap
+    = new HashMap<String, BlockPoolSliceStorage>();
+
+
   DataStorage() {
     super(NodeType.DATA_NODE);
     storageID = "";
   }
   
-  DataStorage(int nsID, long cT, String strgID) {
-    super(NodeType.DATA_NODE, nsID, cT);
-    this.storageID = strgID;
+  public StorageInfo getBPStorage(String bpid) {
+    return bpStorageMap.get(bpid);
   }
   
   public DataStorage(StorageInfo storageInfo, String strgID) {
@@ -76,36 +98,51 @@ public class DataStorage extends Storage
     this.storageID = strgID;
   }
 
-  public String getStorageID() {
+  synchronized String getStorageID() {
     return storageID;
   }
   
-  void setStorageID(String newStorageID) {
+  synchronized void setStorageID(String newStorageID) {
     this.storageID = newStorageID;
   }
   
+  synchronized void createStorageID() {
+    if (storageID != null && !storageID.isEmpty()) {
+      return;
+    }
+    storageID = DataNode.createNewStorageId();
+  }
+  
   /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
    * Perform fs state transition if necessary depending on the namespace info.
-   * Read storage info. 
+   * Read storage info.
+   * <br>
+   * This method should be synchronized between multiple DN threads.  Only the 
+   * first DN thread does DN level storage dir recoverTransitionRead.
    * 
    * @param nsInfo namespace information
    * @param dataDirs array of data storage directories
    * @param startOpt startup option
    * @throws IOException
    */
-  void recoverTransitionRead(NamespaceInfo nsInfo,
+  synchronized void recoverTransitionRead(NamespaceInfo nsInfo,
                              Collection<File> dataDirs,
                              StartupOption startOpt
                              ) throws IOException {
+    if (initilized) {
+      // DN storage has been initialized, no need to do anything
+      return;
+    }
     assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
-      "Data-node and name-node layout versions must be the same.";
+      "Data-node version " + FSConstants.LAYOUT_VERSION + 
+      " and name-node layout version " + nsInfo.getLayoutVersion() + 
+      " must be the same.";
     
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
     // Format and recover.
-    this.storageID = "";
     this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
     ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
     for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
@@ -142,52 +179,148 @@ public class DataStorage extends Storage
 
     if (dataDirs.size() == 0)  // none of the data dirs exist
       throw new IOException(
-                            "All specified directories are not accessible or do not exist.");
+          "All specified directories are not accessible or do not exist.");
 
     // 2. Do transitions
     // Each storage directory is treated individually.
-    // During sturtup some of them can upgrade or rollback 
+    // During startup some of them can upgrade or rollback 
     // while others could be uptodate for the regular startup.
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
       doTransition(getStorageDir(idx), nsInfo, startOpt);
       assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
         "Data-node and name-node layout versions must be the same.";
-      assert this.getCTime() == nsInfo.getCTime() :
-        "Data-node and name-node CTimes must be the same.";
     }
     
+    // make sure we have storage id set - if not - generate new one
+    createStorageID();
+    
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
+    
+    // 4. mark DN storage is initilized
+    this.initilized = true;
+  }
+
+  /**
+   * recoverTransitionRead for a specific block pool
+   * 
+   * @param bpID Block pool Id
+   * @param nsInfo Namespace info of namenode corresponding to the block pool
+   * @param dataDirs Storage directories
+   * @param startOpt startup option
+   * @throws IOException on error
+   */
+  void recoverTransitionRead(String bpID, NamespaceInfo nsInfo,
+      Collection<File> dataDirs, StartupOption startOpt) throws IOException {
+    // First ensure datanode level format/snapshot/rollback is completed
+    recoverTransitionRead(nsInfo, dataDirs, startOpt);
+    
+    // Create list of storage directories for the block pool
+    Collection<File> bpDataDirs = new ArrayList<File>();
+    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
+      File dnRoot = it.next();
+      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
+          STORAGE_DIR_CURRENT));
+      bpDataDirs.add(bpRoot);
+    }
+    // mkdir for the list of BlockPoolStorage
+    makeBlockPoolDataDir(bpDataDirs, null);
+    BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
+        nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
+    
+    bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
+    addBlockPoolStorage(bpID, bpStorage);
+  }
+
+  /**
+   * Create physical directory for block pools on the data node
+   * 
+   * @param dataDirs
+   *          List of data directories
+   * @param conf
+   *          Configuration instance to use.
+   * @throws IOException on errors
+   */
+  static void makeBlockPoolDataDir(Collection<File> dataDirs,
+      Configuration conf) throws IOException {
+    if (conf == null)
+      conf = new HdfsConfiguration();
+
+    LocalFileSystem localFS = FileSystem.getLocal(conf);
+    FsPermission permission = new FsPermission(conf.get(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
+    for (File data : dataDirs) {
+      try {
+        DiskChecker.checkDir(localFS, new Path(data.toURI()), permission);
+      } catch ( IOException e ) {
+        LOG.warn("Invalid directory in: " + data.getCanonicalPath() + ": "
+            + e.getMessage());
+      }
+    }
   }
 
   void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
     sd.clearDirectory(); // create directory
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.clusterID = nsInfo.getClusterID();
     this.namespaceID = nsInfo.getNamespaceID();
     this.cTime = 0;
     // store storageID as it currently is
     sd.write();
   }
 
+  /*
+   * Set ClusterID, StorageID, StorageType, CTime into
+   * DataStorage VERSION file
+  */
+  @Override
   protected void setFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
-    super.setFields(props, sd);
-    props.setProperty("storageID", storageID);
+    props.setProperty("storageType", storageType.toString());
+    props.setProperty("clusterID", clusterID);
+    props.setProperty("cTime", String.valueOf(cTime));
+    props.setProperty("layoutVersion", String.valueOf(layoutVersion));
+    props.setProperty("storageID", getStorageID());
+    // Set NamespaceID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion >= LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      props.setProperty("namespaceID", String.valueOf(namespaceID));
+    }
   }
 
-  protected void getFields(Properties props, 
-                           StorageDirectory sd 
-                           ) throws IOException {
-    super.getFields(props, sd);
+  /*
+   * Read ClusterID, StorageID, StorageType, CTime from 
+   * DataStorage VERSION file and verify them.
+   */
+  @Override
+  protected void getFields(Properties props, StorageDirectory sd)
+      throws IOException {
+    setLayoutVersion(props, sd);
+    setcTime(props, sd);
+    setStorageType(props, sd);
+    setClusterId(props, layoutVersion, sd);
+    
+    // Read NamespaceID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
+    if (layoutVersion >= LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      setNamespaceID(props, sd);
+    }
+    
+    // valid storage id, storage id may be empty
     String ssid = props.getProperty("storageID");
-    if (ssid == null ||
-        !("".equals(storageID) || "".equals(ssid) ||
-          storageID.equals(ssid)))
+    if (ssid == null) {
+      throw new InconsistentFSStateException(sd.getRoot(), "file "
+          + STORAGE_FILE_VERSION + " is invalid.");
+    }
+    String sid = getStorageID();
+    if (!(sid.equals("") || ssid.equals("") || sid.equals(ssid))) {
       throw new InconsistentFSStateException(sd.getRoot(),
-                                             "has incompatible storage Id.");
-    if ("".equals(storageID)) // update id only if it was empty
-      storageID = ssid;
+          "has incompatible storage Id.");
+    }
+    
+    if (sid.equals("")) { // update id only if it was empty
+      setStorageID(ssid);
+    }
   }
 
   @Override
@@ -228,27 +361,44 @@ public class DataStorage extends Storage
                              NamespaceInfo nsInfo, 
                              StartupOption startOpt
                              ) throws IOException {
-    if (startOpt == StartupOption.ROLLBACK)
+    if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
+    }
     sd.read();
     checkVersionUpgradable(this.layoutVersion);
     assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
       "Future version is not allowed";
-    if (getNamespaceID() != nsInfo.getNamespaceID())
-      throw new IOException(
-                            "Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
-                            + ": namenode namespaceID = " + nsInfo.getNamespaceID() 
-                            + "; datanode namespaceID = " + getNamespaceID());
+    
+    // For pre-federation version - validate the namespaceID
+    if (layoutVersion >= Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION &&
+        getNamespaceID() != nsInfo.getNamespaceID()) {
+      throw new IOException("Incompatible namespaceIDs in "
+          + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = "
+          + nsInfo.getNamespaceID() + "; datanode namespaceID = "
+          + getNamespaceID());
+    }
+    
+    // For post federation version, validate clusterID
+    if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION
+        && !getClusterID().equals(nsInfo.getClusterID())) {
+      throw new IOException("Incompatible clusterIDs in "
+          + sd.getRoot().getCanonicalPath() + ": namenode clusterID = "
+          + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
+    }
+    
+    // regular start up
     if (this.layoutVersion == FSConstants.LAYOUT_VERSION 
         && this.cTime == nsInfo.getCTime())
       return; // regular startup
     // verify necessity of a distributed upgrade
     verifyDistributedUpgradeProgress(nsInfo);
+    // do upgrade
     if (this.layoutVersion > FSConstants.LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
       doUpgrade(sd, nsInfo);  // upgrade
       return;
     }
+    
     // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
     // must shutdown
     throw new IOException("Datanode state: LV = " + this.getLayoutVersion() 
@@ -259,46 +409,74 @@ public class DataStorage extends Storage
   }
 
   /**
-   * Move current storage into a backup directory,
+   * Upgrade -- Move current storage into a backup directory,
    * and hardlink all its blocks into the new current directory.
    * 
+   * Upgrade from pre-0.22 to 0.22 or later release e.g. 0.19/0.20/ => 0.22/0.23
+   * <ul>
+   * <li> If <SD>/previous exists then delete it </li>
+   * <li> Rename <SD>/current to <SD>/previous.tmp </li>
+   * <li>Create new <SD>/current/<bpid>/current directory<li>
+   * <ul>
+   * <li> Hard links for block files are created from <SD>/previous.tmp 
+   * to <SD>/current/<bpid>/current </li>
+   * <li> Saves new version file in <SD>/current/<bpid>/current directory </li>
+   * </ul>
+   * <li> Rename <SD>/previous.tmp to <SD>/previous </li>
+   * </ul>
+   * 
+   * There should be only ONE namenode in the cluster for first 
+   * time upgrade to 0.22
    * @param sd  storage directory
-   * @throws IOException
+   * @throws IOException on error
    */
-  void doUpgrade(StorageDirectory sd,
-                 NamespaceInfo nsInfo
-                 ) throws IOException {
+  void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+    if (layoutVersion < Storage.LAST_PRE_FEDERATION_LAYOUT_VERSION) {
+      clusterID = nsInfo.getClusterID();
+      layoutVersion = nsInfo.getLayoutVersion();
+      sd.write();
+      return;
+    }
+    
     LOG.info("Upgrading storage directory " + sd.getRoot()
              + ".\n   old LV = " + this.getLayoutVersion()
              + "; old CTime = " + this.getCTime()
              + ".\n   new LV = " + nsInfo.getLayoutVersion()
              + "; new CTime = " + nsInfo.getCTime());
+    
     File curDir = sd.getCurrentDir();
     File prevDir = sd.getPreviousDir();
-    assert curDir.exists() : "Current directory must exist.";
+    assert curDir.exists() : "Data node current directory must exist.";
     // Cleanup directory "detach"
     cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
-    // delete previous dir before upgrading
+    
+    // 1. delete <SD>/previous dir before upgrading
     if (prevDir.exists())
       deleteDir(prevDir);
+    // get previous.tmp directory, <SD>/previous.tmp
     File tmpDir = sd.getPreviousTmp();
-    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
-    // rename current to tmp
+    assert !tmpDir.exists() : 
+      "Data node previous.tmp directory must not exist.";
+    
+    // 2. Rename <SD>/current to <SD>/previous.tmp
     rename(curDir, tmpDir);
-    // hard link finalized & rbw blocks
-    linkAllBlocks(tmpDir, curDir);
-    // create current directory if not exists
-    if (!curDir.exists() && !curDir.mkdirs())
-      throw new IOException("Cannot create directory " + curDir);
-    // write version file
-    this.layoutVersion = FSConstants.LAYOUT_VERSION;
-    assert this.namespaceID == nsInfo.getNamespaceID() :
-      "Data-node and name-node layout versions must be the same.";
-    this.cTime = nsInfo.getCTime();
+    
+    // 3. Format BP and hard link blocks from previous directory
+    File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir);
+    BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), 
+        nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
+    bpStorage.format(curDir, nsInfo);
+    linkAllBlocks(tmpDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+    
+    // 4. Write version file under <SD>/current
+    layoutVersion = FSConstants.LAYOUT_VERSION;
+    clusterID = nsInfo.getClusterID();
     sd.write();
-    // rename tmp to previous
+    
+    // 5. Rename <SD>/previous.tmp to <SD>/previous
     rename(tmpDir, prevDir);
     LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
+    addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage);
   }
 
   /**
@@ -325,6 +503,20 @@ public class DataStorage extends Storage
     }
   }
   
+  /** 
+   * Rolling back to a snapshot in previous directory by moving it to current
+   * directory.
+   * Rollback procedure:
+   * <br>
+   * If previous directory exists:
+   * <ol>
+   * <li> Rename current to removed.tmp </li>
+   * <li> Rename previous to current </li>
+   * <li> Remove removed.tmp </li>
+   * </ol>
+   * 
+   * Do nothing, if previous directory does not exist.
+   */
   void doRollback( StorageDirectory sd,
                    NamespaceInfo nsInfo
                    ) throws IOException {
@@ -341,10 +533,10 @@ public class DataStorage extends Storage
     if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
           && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
       throw new InconsistentFSStateException(prevSD.getRoot(),
-                                             "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
-                                             + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
-                                             + " is newer than the namespace state: LV = "
-                                             + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
+          "Cannot rollback to a newer state.\nDatanode previous state: LV = "
+              + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
+              + " is newer than the namespace state: LV = "
+              + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
     LOG.info("Rolling back storage directory " + sd.getRoot()
              + ".\n   target LV = " + nsInfo.getLayoutVersion()
              + "; target CTime = " + nsInfo.getCTime());
@@ -360,22 +552,32 @@ public class DataStorage extends Storage
     deleteDir(tmpDir);
     LOG.info("Rollback of " + sd.getRoot() + " is complete.");
   }
-
+  
+  /**
+   * Finalize procedure deletes an existing snapshot.
+   * <ol>
+   * <li>Rename previous to finalized.tmp directory</li>
+   * <li>Fully delete the finalized.tmp directory</li>
+   * </ol>
+   * 
+   * Do nothing, if previous directory does not exist
+   */
   void doFinalize(StorageDirectory sd) throws IOException {
     File prevDir = sd.getPreviousDir();
     if (!prevDir.exists())
       return; // already discarded
+    
     final String dataDirPath = sd.getRoot().getCanonicalPath();
     LOG.info("Finalizing upgrade for storage directory " 
              + dataDirPath 
              + ".\n   cur LV = " + this.getLayoutVersion()
              + "; cur CTime = " + this.getCTime());
     assert sd.getCurrentDir().exists() : "Current directory must exist.";
-    final File tmpDir = sd.getFinalizedTmp();
-    // rename previous to tmp
+    final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory
+    // 1. rename previous to finalized.tmp
     rename(prevDir, tmpDir);
 
-    // delete tmp dir in a separate thread
+    // 2. delete finalized.tmp dir in a separate thread
     new Daemon(new Runnable() {
         public void run() {
           try {
@@ -389,9 +591,24 @@ public class DataStorage extends Storage
       }).start();
   }
   
-  void finalizeUpgrade() throws IOException {
-    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
-      doFinalize(it.next());
+  
+  /*
+   * Finalize the upgrade for a block pool
+   */
+  void finalizeUpgrade(String bpID) throws IOException {
+    // To handle finalizing a snapshot taken at datanode level while 
+    // upgrading to federation, if datanode level snapshot previous exists, 
+    // then finalize it. Else finalize the corresponding BP.
+    for (StorageDirectory sd : storageDirs) {
+      File prevDir = sd.getPreviousDir();
+      if (prevDir.exists()) {
+        // data node level storage finalize
+        doFinalize(sd);
+      } else {
+        // block pool storage finalize using specific bpID
+        BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpID);
+        bpStorage.doFinalize(sd.getCurrentDir());
+      }
     }
   }
 
@@ -507,14 +724,13 @@ public class DataStorage extends Storage
   private void verifyDistributedUpgradeProgress(
                   NamespaceInfo nsInfo
                 ) throws IOException {
-    UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
+    UpgradeManagerDatanode um = 
+      DataNode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
     assert um != null : "DataNode.upgradeManager is null.";
     um.setUpgradeState(false, getLayoutVersion());
     um.initializeUpgrade(nsInfo);
   }
   
-  private static final Pattern PRE_GENSTAMP_META_FILE_PATTERN = 
-    Pattern.compile("(.*blk_[-]*\\d+)\\.meta$");
   /**
    * This is invoked on target file names when upgrading from pre generation 
    * stamp version (version -13) to correct the metatadata file name.
@@ -530,4 +746,18 @@ public class DataStorage extends Storage
     }
     return oldFileName;
   }
+
+  /**
+   * Add bpStorage into bpStorageMap
+   */
+  private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage)
+      throws IOException {
+    if (!this.bpStorageMap.containsKey(bpID)) {
+      this.bpStorageMap.put(bpID, bpStorage);
+    }
+  }
+
+  synchronized void removeBlockPoolStorage(String bpId) {
+    bpStorageMap.remove(bpId);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu May  5 05:40:07 2011
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
@@ -36,15 +35,16 @@ import java.net.SocketException;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
@@ -130,10 +130,10 @@ class DataXceiver extends DataTransferPr
       opStartTime = now();
       processOp(op, in);
     } catch (Throwable t) {
-      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
+      LOG.error(datanode.getMachineName() + ":DataXceiver",t);
     } finally {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+        LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
             + datanode.getXceiverCount());
       }
       updateCurrentThreadName("Cleaning up");
@@ -147,7 +147,7 @@ class DataXceiver extends DataTransferPr
    * Read a block from the disk.
    */
   @Override
-  protected void opReadBlock(DataInputStream in, Block block,
+  protected void opReadBlock(DataInputStream in, ExtendedBlock block,
       long startOffset, long length, String clientName,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     OutputStream baseStream = NetUtils.getOutputStream(s, 
@@ -160,12 +160,14 @@ class DataXceiver extends DataTransferPr
   
     // send the block
     BlockSender blockSender = null;
+    DatanodeRegistration dnR = 
+      datanode.getDNRegistrationForBP(block.getBlockPoolId());
     final String clientTraceFmt =
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
             "%d", "HDFS_READ", clientName, "%d",
-            datanode.dnRegistration.getStorageID(), block, "%d")
-        : datanode.dnRegistration + " Served block " + block + " to " +
+            dnR.getStorageID(), block, "%d")
+        : dnR + " Served block " + block + " to " +
             s.getInetAddress();
 
     updateCurrentThreadName("Sending block " + block);
@@ -180,18 +182,6 @@ class DataXceiver extends DataTransferPr
 
       SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
-
-      // If client verification succeeded, and if it's for the whole block,
-      // tell the DataBlockScanner that it's good. This is an optional response
-      // from the client. If absent, we close the connection (which is what we
-      // always do anyways).
-      try {
-        if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK) {
-          if (blockSender.isBlockReadFully() && datanode.blockScanner != null) {
-            datanode.blockScanner.verifiedByClient(block);
-          }
-        }
-      } catch (IOException ignored) {}
       
       datanode.myMetrics.bytesRead.inc((int) read);
       datanode.myMetrics.blocksRead.inc();
@@ -202,7 +192,7 @@ class DataXceiver extends DataTransferPr
       /* What exactly should we do here?
        * Earlier version shutdown() datanode if there is disk error.
        */
-      LOG.warn(datanode.dnRegistration +  ":Got exception while serving " + 
+      LOG.warn(dnR +  ":Got exception while serving " + 
           block + " to " +
                 s.getInetAddress() + ":\n" + 
                 StringUtils.stringifyException(ioe) );
@@ -222,7 +212,7 @@ class DataXceiver extends DataTransferPr
    * Write a block to disk.
    */
   @Override
-  protected void opWriteBlock(final DataInputStream in, final Block block, 
+  protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, 
       final int pipelineSize, final BlockConstructionStage stage,
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
       final String clientname, final DatanodeInfo srcDataNode,
@@ -257,8 +247,7 @@ class DataXceiver extends DataTransferPr
     // We later mutate block's generation stamp and length, but we need to
     // forward the original version of the block to downstream mirrors, so
     // make a copy here.
-    final Block originalBlock = new Block(block);
-
+    final ExtendedBlock originalBlock = new ExtendedBlock(block);
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
     LOG.info("Receiving block " + block + 
              " src: " + remoteAddress +
@@ -352,7 +341,7 @@ class DataXceiver extends DataTransferPr
           if (isClient) {
             throw e;
           } else {
-            LOG.info(datanode.dnRegistration + ":Exception transfering block " +
+            LOG.info(datanode + ":Exception transfering block " +
                      block + " to mirror " + mirrorNode +
                      ". continuing without the mirror.\n" +
                      StringUtils.stringifyException(e));
@@ -427,7 +416,7 @@ class DataXceiver extends DataTransferPr
 
   @Override
   protected void opTransferBlock(final DataInputStream in,
-      final Block blk, final String client,
+      final ExtendedBlock blk, final String client,
       final DatanodeInfo[] targets,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     checkAccess(null, true, blk, blockToken,
@@ -450,14 +439,13 @@ class DataXceiver extends DataTransferPr
    * Get block checksum (MD5 of CRC32).
    */
   @Override
-  protected void opBlockChecksum(DataInputStream in, Block block,
+  protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.BLOCK_CHECKSUM,
         BlockTokenSecretManager.AccessMode.READ);
-
     updateCurrentThreadName("Reading metadata for block " + block);
     final MetaDataInputStream metadataIn = 
       datanode.data.getMetaDataInputStream(block);
@@ -501,13 +489,13 @@ class DataXceiver extends DataTransferPr
    * Read a block from the disk and then sends it to a destination.
    */
   @Override
-  protected void opCopyBlock(DataInputStream in, Block block,
+  protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);
     // Read in the header
     if (datanode.isBlockTokenEnabled) {
       try {
-        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+        datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
             BlockTokenSecretManager.AccessMode.COPY);
       } catch (InvalidToken e) {
         LOG.warn("Invalid access token in request from " + remoteAddress
@@ -577,7 +565,7 @@ class DataXceiver extends DataTransferPr
    */
   @Override
   protected void opReplaceBlock(DataInputStream in,
-      Block block, String sourceID, DatanodeInfo proxySource,
+      ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Replacing block " + block + " from " + sourceID);
 
@@ -585,7 +573,7 @@ class DataXceiver extends DataTransferPr
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
     if (datanode.isBlockTokenEnabled) {
       try {
-        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+        datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
             BlockTokenSecretManager.AccessMode.REPLACE);
       } catch (InvalidToken e) {
         LOG.warn("Invalid access token in request from " + remoteAddress
@@ -713,13 +701,13 @@ class DataXceiver extends DataTransferPr
   }
 
   private void checkAccess(DataOutputStream out, final boolean reply, 
-      final Block blk,
+      final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final DataTransferProtocol.Op op,
       final BlockTokenSecretManager.AccessMode mode) throws IOException {
     if (datanode.isBlockTokenEnabled) {
       try {
-        datanode.blockTokenSecretManager.checkAccess(t, null, blk, mode);
+        datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
       } catch(InvalidToken e) {
         try {
           if (reply) {
@@ -729,7 +717,9 @@ class DataXceiver extends DataTransferPr
             }
             ERROR_ACCESS_TOKEN.write(out);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
-              Text.writeString(out, datanode.dnRegistration.getName());
+              DatanodeRegistration dnR = 
+                datanode.getDNRegistrationForBP(blk.getBlockPoolId());
+              Text.writeString(out, dnR.getName());
             }
             out.flush();
           }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Thu May  5 05:40:07 2011
@@ -140,10 +140,10 @@ class DataXceiverServer implements Runna
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
       } catch (IOException ie) {
-        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
+        LOG.warn(datanode.getMachineName() + ":DataXceiveServer: " 
                                 + StringUtils.stringifyException(ie));
       } catch (Throwable te) {
-        LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:" 
+        LOG.error(datanode.getMachineName() + ":DataXceiveServer: Exiting due to:" 
                                  + StringUtils.stringifyException(te));
         datanode.shouldRun = false;
       }
@@ -151,7 +151,7 @@ class DataXceiverServer implements Runna
     try {
       ss.close();
     } catch (IOException ie) {
-      LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
+      LOG.warn(datanode.getMachineName() + ":DataXceiveServer: " 
                               + StringUtils.stringifyException(ie));
     }
   }
@@ -162,7 +162,7 @@ class DataXceiverServer implements Runna
     try {
       this.ss.close();
     } catch (IOException ie) {
-      LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): " 
+      LOG.warn(datanode.getMachineName() + ":DataXceiveServer.kill(): " 
                               + StringUtils.stringifyException(ie));
     }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu May  5 05:40:07 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -49,8 +50,6 @@ import org.apache.hadoop.util.StringUtil
 
 @InterfaceAudience.Private
 public class DatanodeJspHelper {
-  private static final DataNode datanode = DataNode.getDataNode();
-
   private static DFSClient getDFSClient(final UserGroupInformation user,
                                         final InetSocketAddress addr,
                                         final Configuration conf
@@ -94,13 +93,20 @@ public class DatanodeJspHelper {
     int namenodeInfoPort = -1;
     if (namenodeInfoPortStr != null)
       namenodeInfoPort = Integer.parseInt(namenodeInfoPortStr);
-
-    DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
+    final String nnAddr = req.getParameter(JspHelper.NAMENODE_ADDRESS);
+    if (nnAddr == null){
+      out.print(JspHelper.NAMENODE_ADDRESS + " url param is null");
+      return;
+    }
+    
+    InetSocketAddress namenodeAddress = DFSUtil.getSocketAddress(nnAddr);
+    DFSClient dfs = getDFSClient(ugi, namenodeAddress, conf);
     String target = dir;
     final HdfsFileStatus targetStatus = dfs.getFileInfo(target);
     if (targetStatus == null) { // not exists
       out.print("<h3>File or directory : " + target + " does not exist</h3>");
-      JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, target);
+      JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, target,
+          nnAddr);
     } else {
       if (!targetStatus.isDir()) { // a file
         List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(dir, 0, 1)
@@ -128,7 +134,8 @@ public class DatanodeJspHelper {
               + firstBlock.getBlock().getGenerationStamp() + "&filename="
               + URLEncoder.encode(dir, "UTF-8") + "&datanodePort="
               + datanodePort + "&namenodeInfoPort=" + namenodeInfoPort
-              + JspHelper.getDelegationTokenUrlParam(tokenString);
+              + JspHelper.getDelegationTokenUrlParam(tokenString)
+              + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
           resp.sendRedirect(redirectLocation);
         }
         return;
@@ -138,9 +145,10 @@ public class DatanodeJspHelper {
       String[] headings = { "Name", "Type", "Size", "Replication",
           "Block Size", "Modification Time", "Permission", "Owner", "Group" };
       out.print("<h3>Contents of directory ");
-      JspHelper.printPathWithLinks(dir, out, namenodeInfoPort, tokenString);
+      JspHelper.printPathWithLinks(dir, out, namenodeInfoPort, tokenString,
+          nnAddr);
       out.print("</h3><hr>");
-      JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, dir);
+      JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, dir, nnAddr);
       out.print("<hr>");
 
       File f = new File(dir);
@@ -149,6 +157,7 @@ public class DatanodeJspHelper {
         out.print("<a href=\"" + req.getRequestURL() + "?dir=" + parent
             + "&namenodeInfoPort=" + namenodeInfoPort
             + JspHelper.getDelegationTokenUrlParam(tokenString)
+            + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr)
             + "\">Go to parent directory</a><br>");
 
       DirectoryListing thisListing = 
@@ -179,7 +188,8 @@ public class DatanodeJspHelper {
             String datanodeUrl = req.getRequestURL() + "?dir="
               + URLEncoder.encode(files[i].getFullName(target), "UTF-8")
               + "&namenodeInfoPort=" + namenodeInfoPort
-              + JspHelper.getDelegationTokenUrlParam(tokenString);
+              + JspHelper.getDelegationTokenUrlParam(tokenString)
+              + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
             cols[0] = "<a href=\"" + datanodeUrl + "\">"
               + localFileName + "</a>";
             cols[5] = lsDateFormat.format(new Date((files[i]
@@ -197,7 +207,7 @@ public class DatanodeJspHelper {
         JspHelper.addTableFooter(out);
       }
     }
-    String namenodeHost = datanode.getNameNodeAddrForClient().getHostName();
+    String namenodeHost = namenodeAddress.getHostName();
     out.print("<br><a href=\"http://"
         + InetAddress.getByName(namenodeHost).getCanonicalHostName() + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
@@ -232,6 +242,11 @@ public class DatanodeJspHelper {
     int namenodeInfoPort = -1;
     if (namenodeInfoPortStr != null)
       namenodeInfoPort = Integer.parseInt(namenodeInfoPortStr);
+    final String nnAddr = req.getParameter(JspHelper.NAMENODE_ADDRESS);
+    if (nnAddr == null){
+      out.print(JspHelper.NAMENODE_ADDRESS + " url param is null");
+      return;
+    }
 
     final int chunkSizeToView = JspHelper.string2ChunkSizeToView(
         req.getParameter("chunkSizeToView"), getDefaultChunkSize(conf));
@@ -250,15 +265,15 @@ public class DatanodeJspHelper {
       return;
     }
 
-    String blockSizeStr = req.getParameter("blockSize");
-    long blockSize = 0;
+    final String blockSizeStr = req.getParameter("blockSize");
     if (blockSizeStr == null || blockSizeStr.length() == 0) {
       out.print("Invalid input");
       return;
     }
-    blockSize = Long.parseLong(blockSizeStr);
+    long blockSize = Long.parseLong(blockSizeStr);
 
-    final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
+    final InetSocketAddress namenodeAddress = DFSUtil.getSocketAddress(nnAddr);
+    final DFSClient dfs = getDFSClient(ugi, namenodeAddress, conf);
     List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
         Long.MAX_VALUE).getLocatedBlocks();
     // Add the various links for looking at the file contents
@@ -266,7 +281,8 @@ public class DatanodeJspHelper {
     String downloadUrl = "http://" + req.getServerName() + ":"
         + req.getServerPort() + "/streamFile?" + "filename="
         + URLEncoder.encode(filename, "UTF-8")
-        + JspHelper.getDelegationTokenUrlParam(tokenString);
+        + JspHelper.getDelegationTokenUrlParam(tokenString)
+        + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
     out.print("<a name=\"viewOptions\"></a>");
     out.print("<a href=\"" + downloadUrl + "\">Download this file</a><br>");
 
@@ -287,6 +303,7 @@ public class DatanodeJspHelper {
         + "&namenodeInfoPort=" + namenodeInfoPort
         + "&chunkSizeToView=" + chunkSizeToView
         + JspHelper.getDelegationTokenUrlParam(tokenString)
+        + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr)
         + "&referrer=" + URLEncoder.encode(
             req.getRequestURL() + "?" + req.getQueryString(), "UTF-8");
     out.print("<a href=\"" + tailUrl + "\">Tail this file</a><br>");
@@ -305,6 +322,8 @@ public class DatanodeJspHelper {
         + datanodePort + "\">");
     out.print("<input type=\"hidden\" name=\"namenodeInfoPort\" value=\""
         + namenodeInfoPort + "\">");
+    out.print("<input type=\"hidden\" name=\"" + JspHelper.NAMENODE_ADDRESS
+        + "\" value=\"" + nnAddr + "\">");
     out.print("<input type=\"text\" name=\"chunkSizeToView\" value="
         + chunkSizeToView + " size=10 maxlength=10>");
     out.print("&nbsp;&nbsp;<input type=\"submit\" name=\"submit\" value=\"Refresh\">");
@@ -315,7 +334,7 @@ public class DatanodeJspHelper {
     // generate a table and dump the info
     out.println("\n<table>");
     
-    String namenodeHost = datanode.getNameNodeAddrForClient().getHostName();
+    String namenodeHost = namenodeAddress.getHostName();
     String namenodeHostName = InetAddress.getByName(namenodeHost).getCanonicalHostName();
     
     for (LocatedBlock cur : blocks) {
@@ -337,7 +356,8 @@ public class DatanodeJspHelper {
             + "&genstamp=" + cur.getBlock().getGenerationStamp()
             + "&namenodeInfoPort=" + namenodeInfoPort
             + "&chunkSizeToView=" + chunkSizeToView
-            + JspHelper.getDelegationTokenUrlParam(tokenString);
+            + JspHelper.getDelegationTokenUrlParam(tokenString)
+            + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
 
         String blockInfoUrl = "http://" + namenodeHostName + ":"
             + namenodeInfoPort
@@ -363,8 +383,13 @@ public class DatanodeJspHelper {
     long startOffset = 0;
     int datanodePort = 0;
 
-    String namenodeInfoPortStr = req.getParameter("namenodeInfoPort");
-    String tokenString = req.getParameter(JspHelper.DELEGATION_PARAMETER_NAME);
+    final String namenodeInfoPortStr = req.getParameter("namenodeInfoPort");
+    final String nnAddr = req.getParameter(JspHelper.NAMENODE_ADDRESS);
+    if (nnAddr == null) {
+      out.print(JspHelper.NAMENODE_ADDRESS + " url param is null");
+      return;
+    }
+    final String tokenString = req.getParameter(JspHelper.DELEGATION_PARAMETER_NAME);
     UserGroupInformation ugi = JspHelper.getUGI(req, conf);
     int namenodeInfoPort = -1;
     if (namenodeInfoPortStr != null)
@@ -382,9 +407,11 @@ public class DatanodeJspHelper {
       out.print("Invalid input (blockId absent)");
       return;
     }
+    
+    final DFSClient dfs = getDFSClient(ugi, 
+        DFSUtil.getSocketAddress(nnAddr), conf);
 
-    final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
-
+    String bpid = null;
     Token<BlockTokenIdentifier> blockToken = BlockTokenSecretManager.DUMMY_TOKEN;
     if (conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
@@ -398,6 +425,7 @@ public class DatanodeJspHelper {
       }
       for (int i = 0; i < blks.size(); i++) {
         if (blks.get(i).getBlock().getBlockId() == blockId) {
+          bpid = blks.get(i).getBlock().getBlockPoolId();
           blockToken = blks.get(i).getBlockToken();
           break;
         }
@@ -435,16 +463,17 @@ public class DatanodeJspHelper {
     datanodePort = Integer.parseInt(datanodePortStr);
     out.print("<h3>File: ");
     JspHelper.printPathWithLinks(filename, out, namenodeInfoPort,
-                                 tokenString);
+                                 tokenString, nnAddr);
     out.print("</h3><hr>");
     String parent = new File(filename).getParent();
-    JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, parent);
+    JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, parent, nnAddr);
     out.print("<hr>");
     out.print("<a href=\"http://"
         + req.getServerName() + ":" + req.getServerPort()
         + "/browseDirectory.jsp?dir=" + URLEncoder.encode(parent, "UTF-8")
         + "&namenodeInfoPort=" + namenodeInfoPort
         + JspHelper.getDelegationTokenUrlParam(tokenString)
+        + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr)
         + "\"><i>Go back to dir listing</i></a><br>");
     out.print("<a href=\"#viewOptions\">Advanced view/download options</a><br>");
     out.print("<hr>");
@@ -499,7 +528,8 @@ public class DatanodeJspHelper {
           + "&chunkSizeToView=" + chunkSizeToView
           + "&datanodePort=" + nextDatanodePort
           + "&namenodeInfoPort=" + namenodeInfoPort
-          + JspHelper.getDelegationTokenUrlParam(tokenString);
+          + JspHelper.getDelegationTokenUrlParam(tokenString)
+          + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
       out.print("<a href=\"" + nextUrl + "\">View Next chunk</a>&nbsp;&nbsp;");
     }
     // determine data for the prev link
@@ -556,14 +586,15 @@ public class DatanodeJspHelper {
           + "&genstamp=" + prevGenStamp
           + "&datanodePort=" + prevDatanodePort
           + "&namenodeInfoPort=" + namenodeInfoPort
-          + JspHelper.getDelegationTokenUrlParam(tokenString);
+          + JspHelper.getDelegationTokenUrlParam(tokenString)
+          + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
       out.print("<a href=\"" + prevUrl + "\">View Prev chunk</a>&nbsp;&nbsp;");
     }
     out.print("<hr>");
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
-          datanodePort), blockId, blockToken, genStamp, blockSize,
+          datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
           startOffset, chunkSizeToView, out, conf);
     } catch (Exception e) {
       out.print(e);
@@ -592,6 +623,7 @@ public class DatanodeJspHelper {
     UserGroupInformation ugi = JspHelper.getUGI(req, conf);
 
     String namenodeInfoPortStr = req.getParameter("namenodeInfoPort");
+    String nnAddr = req.getParameter(JspHelper.NAMENODE_ADDRESS);
     int namenodeInfoPort = -1;
     if (namenodeInfoPortStr != null)
       namenodeInfoPort = Integer.parseInt(namenodeInfoPortStr);
@@ -602,7 +634,7 @@ public class DatanodeJspHelper {
     if (!noLink) {
       out.print("<h3>Tail of File: ");
       JspHelper.printPathWithLinks(filename, out, namenodeInfoPort, 
-                                   tokenString);
+                                   tokenString, nnAddr);
       out.print("</h3><hr>");
       out.print("<a href=\"" + referrer + "\">Go Back to File View</a><hr>");
     } else {
@@ -616,12 +648,15 @@ public class DatanodeJspHelper {
         + "\">");
     out.print("<input type=\"hidden\" name=\"namenodeInfoPort\" value=\""
         + namenodeInfoPort + "\">");
+    out.print("<input type=\"hidden\" name=\"" + JspHelper.NAMENODE_ADDRESS
+        + "\" value=\"" + nnAddr + "\">");
     if (!noLink)
       out.print("<input type=\"hidden\" name=\"referrer\" value=\"" + referrer
           + "\">");
 
     // fetch the block from the datanode that has the last block for this file
-    final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddrForClient(), conf);
+    final DFSClient dfs = getDFSClient(ugi, DFSUtil.getSocketAddress(nnAddr),
+        conf);
     List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
         Long.MAX_VALUE).getLocatedBlocks();
     if (blocks == null || blocks.size() == 0) {
@@ -630,6 +665,7 @@ public class DatanodeJspHelper {
       return;
     }
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
+    String poolId = lastBlk.getBlock().getBlockPoolId();
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
     Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
@@ -648,9 +684,18 @@ public class DatanodeJspHelper {
         - chunkSizeToView : 0;
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
-    JspHelper.streamBlockInAscii(addr, blockId, accessToken, genStamp,
+    JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
         blockSize, startOffset, chunkSizeToView, out, conf);
     out.print("</textarea>");
     dfs.close();
   }
+  
+  
+  /** Get DFSClient for a namenode corresponding to the BPID from a datanode */
+  public static DFSClient getDFSClient(final HttpServletRequest request,
+      final DataNode datanode, final Configuration conf,
+      final UserGroupInformation ugi) throws IOException, InterruptedException {
+    final String nnAddr = request.getParameter(JspHelper.NAMENODE_ADDRESS);
+    return getDFSClient(ugi, DFSUtil.getSocketAddress(nnAddr), conf);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1099687&r1=1099686&r2=1099687&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Thu May  5 05:40:07 2011
@@ -22,12 +22,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.util.DaemonFactory;
 
 /**
  * Periodically scans the data directories for block and block metadata files.
@@ -44,23 +50,102 @@ import org.apache.hadoop.hdfs.server.dat
  * {@link FSDataset}
  */
 @InterfaceAudience.Private
-public class DirectoryScanner {
+public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
   private static final int DEFAULT_SCAN_INTERVAL = 21600;
 
   private final FSDataset dataset;
-  private long scanPeriod;
-  private long lastScanTime;
-  private ExecutorService reportCompileThreadPool;
-
-  LinkedList<ScanInfo> diff = new LinkedList<ScanInfo>();
-
-  /** Stats tracked for reporting and testing */
-  long totalBlocks;
-  long missingMetaFile;
-  long missingBlockFile;
-  long missingMemoryBlocks;
-  long mismatchBlocks;
+  private final ExecutorService reportCompileThreadPool;
+  private final ScheduledExecutorService masterThread;
+  private final long scanPeriodMsecs;
+  private volatile boolean shouldRun = false;
+  private boolean retainDiffs = false;
+
+  ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
+  Map<String, Stats> stats = new HashMap<String, Stats>();
+  
+  /**
+   * Allow retaining diffs for unit test and analysis
+   * @param b - defaults to false (off)
+   */
+  void setRetainDiffs(boolean b) {
+    retainDiffs = b;
+  }
+
+  /** Stats tracked for reporting and testing, per blockpool */
+  static class Stats {
+    String bpid;
+    long totalBlocks = 0;
+    long missingMetaFile = 0;
+    long missingBlockFile = 0;
+    long missingMemoryBlocks = 0;
+    long mismatchBlocks = 0;
+    
+    public Stats(String bpid) {
+      this.bpid = bpid;
+    }
+    
+    public String toString() {
+      return "BlockPool " + bpid
+      + " Total blocks: " + totalBlocks + ", missing metadata files:"
+      + missingMetaFile + ", missing block files:" + missingBlockFile
+      + ", missing blocks in memory:" + missingMemoryBlocks
+      + ", mismatched blocks:" + mismatchBlocks;
+    }
+  }
+  
+  static class ScanInfoPerBlockPool extends 
+                     HashMap<String, LinkedList<ScanInfo>> {
+    
+    private static final long serialVersionUID = 1L;
+
+    ScanInfoPerBlockPool() {super();}
+    
+    ScanInfoPerBlockPool(int sz) {super(sz);}
+    
+    /**
+     * Merges "that" ScanInfoPerBlockPool into this one
+     * @param that
+     */
+    public void addAll(ScanInfoPerBlockPool that) {
+      if (that == null) return;
+      
+      for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
+        String bpid = entry.getKey();
+        LinkedList<ScanInfo> list = entry.getValue();
+        
+        if (this.containsKey(bpid)) {
+          //merge that per-bpid linked list with this one
+          this.get(bpid).addAll(list);
+        } else {
+          //add that new bpid and its linked list to this
+          this.put(bpid, list);
+        }
+      }
+    }
+    
+    /**
+     * Convert all the LinkedList values in this ScanInfoPerBlockPool map
+     * into sorted arrays, and return a new map of these arrays per blockpool
+     * @return a map of ScanInfo arrays per blockpool
+     */
+    public Map<String, ScanInfo[]> toSortedArrays() {
+      Map<String, ScanInfo[]> result = 
+        new HashMap<String, ScanInfo[]>(this.size());
+      
+      for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
+        String bpid = entry.getKey();
+        LinkedList<ScanInfo> list = entry.getValue();
+        
+        // convert list to array
+        ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
+        // Sort array based on blockId
+        Arrays.sort(record);
+        result.put(bpid, record);            
+      }
+      return result;
+    }
+  }
 
   /**
    * Tracks the files and other information related to a block on the disk
@@ -137,34 +222,84 @@ public class DirectoryScanner {
     this.dataset = dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DEFAULT_SCAN_INTERVAL);
-    scanPeriod = interval * 1000L;
+    scanPeriodMsecs = interval * 1000L; //msec
     int threads = 
         conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                     DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
 
-    reportCompileThreadPool = Executors.newFixedThreadPool(threads);
+    reportCompileThreadPool = Executors.newFixedThreadPool(threads, 
+        new DaemonFactory());
+    masterThread = new ScheduledThreadPoolExecutor(1, new DaemonFactory());
+  }
 
+  void start() {
+    shouldRun = true;
     Random rand = new Random();
-    lastScanTime = System.currentTimeMillis() - (rand.nextInt(interval) * 1000L);
-    LOG.info("scan starts at " + (lastScanTime + scanPeriod)
-        + " with interval " + scanPeriod);
+    long offset = rand.nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
+    long firstScanTime = System.currentTimeMillis() + offset;
+    LOG.info("Periodic Directory Tree Verification scan starting at " 
+        + firstScanTime + " with interval " + scanPeriodMsecs);
+    masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, 
+                                     TimeUnit.MILLISECONDS);
   }
-
-  boolean newScanPeriod(long now) {
-    return now > lastScanTime + scanPeriod;
+  
+  // for unit test
+  boolean getRunStatus() {
+    return shouldRun;
   }
 
   private void clear() {
-    diff.clear();
-    totalBlocks = 0;
-    missingMetaFile = 0;
-    missingBlockFile = 0;
-    missingMemoryBlocks = 0;
-    mismatchBlocks = 0;
+    diffs.clear();
+    stats.clear();
+  }
+
+  /**
+   * Main program loop for DirectoryScanner
+   * Runs "reconcile()" periodically under the masterThread.
+   */
+  @Override
+  public void run() {
+    try {
+      if (!shouldRun) {
+        //shutdown has been activated
+        LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
+        return;
+      }
+
+      String[] bpids = dataset.getBPIdlist();
+      for(String bpid : bpids) {
+        UpgradeManagerDatanode um = 
+          DataNode.getUpgradeManagerDatanode(bpid);
+        if (um != null && !um.isUpgradeCompleted()) {
+          //If distributed upgrades underway, exit and wait for next cycle.
+          LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
+          return; 
+        }
+      }
+      
+      //We're are okay to run - do it
+      reconcile();      
+      
+    } catch (Exception e) {
+      //Log and continue - allows Executor to run again next cycle
+      LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e);
+    } catch (Error er) {
+      //Non-recoverable error - re-throw after logging the problem
+      LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er);
+      throw er;
+    }
   }
   
   void shutdown() {
-    reportCompileThreadPool.shutdown();
+    if (!shouldRun) {
+      LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
+    } else {
+      LOG.warn("DirectoryScanner: shutdown has been called");      
+    }
+    shouldRun = false;
+    if (masterThread != null) masterThread.shutdown();
+    if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
+    if (!retainDiffs) clear();
   }
 
   /**
@@ -172,111 +307,130 @@ public class DirectoryScanner {
    */
   void reconcile() {
     scan();
-    for (ScanInfo info : diff) {
-      dataset.checkAndUpdate(info.getBlockId(), info.getBlockFile(), info
-          .getMetaFile(), info.getVolume());
+    for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
+      String bpid = entry.getKey();
+      LinkedList<ScanInfo> diff = entry.getValue();
+      
+      for (ScanInfo info : diff) {
+        dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
+            info.getMetaFile(), info.getVolume());
+      }
     }
+    if (!retainDiffs) clear();
   }
 
   /**
    * Scan for the differences between disk and in-memory blocks
+   * Scan only the "finalized blocks" lists of both disk and memory.
    */
   void scan() {
     clear();
-    ScanInfo[] diskReport = getDiskReport();
-    totalBlocks = diskReport.length;
+    Map<String, ScanInfo[]> diskReport = getDiskReport();
 
     // Hold FSDataset lock to prevent further changes to the block map
     synchronized(dataset) {
-      Block[] memReport = dataset.getBlockList(false);
-      Arrays.sort(memReport); // Sort based on blockId
-
-      int d = 0; // index for diskReport
-      int m = 0; // index for memReprot
-      while (m < memReport.length && d < diskReport.length) {
-        Block memBlock = memReport[Math.min(m, memReport.length - 1)];
-        ScanInfo info = diskReport[Math.min(d, diskReport.length - 1)];
-        if (info.getBlockId() < memBlock.getBlockId()) {
-          // Block is missing in memory
-          missingMemoryBlocks++;
-          addDifference(info);
+      for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
+        String bpid = entry.getKey();
+        ScanInfo[] blockpoolReport = entry.getValue();
+        
+        Stats statsRecord = new Stats(bpid);
+        stats.put(bpid, statsRecord);
+        LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
+        diffs.put(bpid, diffRecord);
+        
+        statsRecord.totalBlocks = blockpoolReport.length;
+        List<Block> bl = dataset.getFinalizedBlocks(bpid);
+        Block[] memReport = bl.toArray(new Block[bl.size()]);
+        Arrays.sort(memReport); // Sort based on blockId
+  
+        int d = 0; // index for blockpoolReport
+        int m = 0; // index for memReprot
+        while (m < memReport.length && d < blockpoolReport.length) {
+          Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+          ScanInfo info = blockpoolReport[Math.min(
+              d, blockpoolReport.length - 1)];
+          if (info.getBlockId() < memBlock.getBlockId()) {
+            // Block is missing in memory
+            statsRecord.missingMemoryBlocks++;
+            addDifference(diffRecord, statsRecord, info);
+            d++;
+            continue;
+          }
+          if (info.getBlockId() > memBlock.getBlockId()) {
+            // Block is missing on the disk
+            addDifference(diffRecord, statsRecord, memBlock.getBlockId());
+            m++;
+            continue;
+          }
+          // Block file and/or metadata file exists on the disk
+          // Block exists in memory
+          if (info.getBlockFile() == null) {
+            // Block metadata file exits and block file is missing
+            addDifference(diffRecord, statsRecord, info);
+          } else if (info.getGenStamp() != memBlock.getGenerationStamp()
+              || info.getBlockFile().length() != memBlock.getNumBytes()) {
+            // Block metadata file is missing or has wrong generation stamp,
+            // or block file length is different than expected
+            statsRecord.mismatchBlocks++;
+            addDifference(diffRecord, statsRecord, info);
+          }
           d++;
-          continue;
-        }
-        if (info.getBlockId() > memBlock.getBlockId()) {
-          // Block is missing on the disk
-          addDifference(memBlock.getBlockId());
           m++;
-          continue;
         }
-        // Block file and/or metadata file exists on the disk
-        // Block exists in memory
-        if (info.getBlockFile() == null) {
-          // Block metadata file exits and block file is missing
-          addDifference(info);
-        } else if (info.getGenStamp() != memBlock.getGenerationStamp()
-            || info.getBlockFile().length() != memBlock.getNumBytes()) {
-          mismatchBlocks++;
-          addDifference(info);
+        while (m < memReport.length) {
+          addDifference(diffRecord, statsRecord, memReport[m++].getBlockId());
         }
-        d++;
-        m++;
-      }
-      while (m < memReport.length) {
-        addDifference(memReport[m++].getBlockId());
-      }
-      while (d < diskReport.length) {
-        missingMemoryBlocks++;
-        addDifference(diskReport[d++]);
-      }
-    }
-    LOG.info("Total blocks: " + totalBlocks + ", missing metadata files:"
-        + missingMetaFile + ", missing block files:" + missingBlockFile
-        + ", missing blocks in memory:" + missingMemoryBlocks
-        + ", mismatched blocks:" + mismatchBlocks);
-    lastScanTime = System.currentTimeMillis();
+        while (d < blockpoolReport.length) {
+          statsRecord.missingMemoryBlocks++;
+          addDifference(diffRecord, statsRecord, blockpoolReport[d++]);
+        }
+        LOG.info(statsRecord.toString());
+      } //end for
+    } //end synchronized
   }
 
   /**
    * Block is found on the disk. In-memory block is missing or does not match
    * the block on the disk
    */
-  private void addDifference(ScanInfo info) {
-    missingMetaFile += info.getMetaFile() == null ? 1 : 0;
-    missingBlockFile += info.getBlockFile() == null ? 1 : 0;
-    diff.add(info);
+  private void addDifference(LinkedList<ScanInfo> diffRecord, 
+                             Stats statsRecord, ScanInfo info) {
+    statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
+    statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
+    diffRecord.add(info);
   }
 
   /** Block is not found on the disk */
-  private void addDifference(long blockId) {
-    missingBlockFile++;
-    missingMetaFile++;
-    diff.add(new ScanInfo(blockId));
+  private void addDifference(LinkedList<ScanInfo> diffRecord,
+                             Stats statsRecord, long blockId) {
+    statsRecord.missingBlockFile++;
+    statsRecord.missingMetaFile++;
+    diffRecord.add(new ScanInfo(blockId));
   }
 
-  /** Get list of blocks on the disk sorted by blockId */
-  private ScanInfo[] getDiskReport() {
+  /** Get lists of blocks on the disk sorted by blockId, per blockpool */
+  private Map<String, ScanInfo[]> getDiskReport() {
     // First get list of data directories
-    FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
-    ArrayList<LinkedList<ScanInfo>> dirReports =
-      new ArrayList<LinkedList<ScanInfo>>(volumes.length);
+    List<FSVolume> volumes = dataset.volumes.getVolumes();
+    ArrayList<ScanInfoPerBlockPool> dirReports =
+      new ArrayList<ScanInfoPerBlockPool>(volumes.size());
     
-    Map<Integer, Future<LinkedList<ScanInfo>>> compilersInProgress =
-      new HashMap<Integer, Future<LinkedList<ScanInfo>>>();
-    for (int i = 0; i < volumes.length; i++) {
-      if (!dataset.volumes.isValid(volumes[i])) { // volume is still valid
+    Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
+      new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
+    for (int i = 0; i < volumes.size(); i++) {
+      if (!dataset.volumes.isValid(volumes.get(i))) { // volume is still valid
         dirReports.add(i, null);
       } else {
         ReportCompiler reportCompiler =
-          new ReportCompiler(volumes[i], volumes[i].getDir());
-        Future<LinkedList<ScanInfo>> result = 
+          new ReportCompiler(volumes.get(i));
+        Future<ScanInfoPerBlockPool> result = 
           reportCompileThreadPool.submit(reportCompiler);
         compilersInProgress.put(i, result);
       }
     }
     
-    for (Map.Entry<Integer, Future<LinkedList<ScanInfo>>> report :
-      compilersInProgress.entrySet()) {
+    for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
+        compilersInProgress.entrySet()) {
       try {
         dirReports.add(report.getKey(), report.getValue().get());
       } catch (Exception ex) {
@@ -287,17 +441,14 @@ public class DirectoryScanner {
     }
 
     // Compile consolidated report for all the volumes
-    LinkedList<ScanInfo> list = new LinkedList<ScanInfo>();
-    for (int i = 0; i < volumes.length; i++) {
-      if (dataset.volumes.isValid(volumes[i])) { // volume is still valid
+    ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
+    for (int i = 0; i < volumes.size(); i++) {
+      if (dataset.volumes.isValid(volumes.get(i))) { // volume is still valid
         list.addAll(dirReports.get(i));
       }
     }
 
-    ScanInfo[] report = list.toArray(new ScanInfo[list.size()]);
-    // Sort the report based on blockId
-    Arrays.sort(report);
-    return report;
+    return list.toSortedArrays();
   }
 
   private static boolean isBlockMetaFile(String blockId, String metaFile) {
@@ -305,19 +456,23 @@ public class DirectoryScanner {
         && metaFile.endsWith(Block.METADATA_EXTENSION);
   }
 
-  private static class ReportCompiler implements Callable<LinkedList<ScanInfo>> {
+  private static class ReportCompiler 
+  implements Callable<ScanInfoPerBlockPool> {
     private FSVolume volume;
-    private File dir;
 
-    public ReportCompiler(FSVolume volume, File dir) {
-      this.dir = dir;
+    public ReportCompiler(FSVolume volume) {
       this.volume = volume;
     }
 
     @Override
-    public LinkedList<ScanInfo> call() throws Exception {
-      LinkedList<ScanInfo> result = new LinkedList<ScanInfo>();
-      compileReport(volume, dir, result);
+    public ScanInfoPerBlockPool call() throws Exception {
+      String[] bpList = volume.getBlockPoolList();
+      ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
+      for (String bpid : bpList) {
+        LinkedList<ScanInfo> report = new LinkedList<ScanInfo>();
+        File bpFinalizedDir = volume.getBlockPoolSlice(bpid).getFinalizedDir();
+        result.put(bpid, compileReport(volume, bpFinalizedDir, report));
+      }
       return result;
     }
 



Mime
View raw message