hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1407217 [5/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/main/proto/ src/contrib/bkjournal/src/test/j...
Date Thu, 08 Nov 2012 19:10:04 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Nov  8 19:09:46 2012
@@ -44,10 +44,10 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -57,6 +57,8 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.util.ByteArray;
 
 import com.google.common.base.Preconditions;
@@ -140,7 +142,7 @@ public class FSDirectory implements Clos
         DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
         DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
     NameNode.LOG.info("Caching file names occuring more than " + threshold
-        + " times ");
+        + " times");
     nameCache = new NameCache<ByteArray>(threshold);
     namesystem = ns;
   }
@@ -253,15 +255,12 @@ public class FSDirectory implements Clos
       writeUnlock();
     }
     if (newNode == null) {
-      NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
-                                   +"failed to add "+path
-                                   +" to the file system");
+      NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
       return null;
     }
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
-          +path+" is added to the file system");
+      NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
     }
     return newNode;
   }
@@ -315,7 +314,7 @@ public class FSDirectory implements Clos
       }
       if(newParent == null)
         return null;
-      if(!newNode.isDirectory() && !newNode.isLink()) {
+      if(!newNode.isDirectory() && !newNode.isSymlink()) {
         // Add file->block mapping
         INodeFile newF = (INodeFile)newNode;
         BlockInfo[] blocks = newF.getBlocks();
@@ -332,22 +331,18 @@ public class FSDirectory implements Clos
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  BlockInfo addBlock(String path,
-                     INode[] inodes,
-                     Block block,
-                     DatanodeDescriptor targets[]
-  ) throws QuotaExceededException {
+  BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
+      DatanodeDescriptor targets[]) throws IOException {
     waitForReady();
 
     writeLock();
     try {
-      assert inodes[inodes.length-1].isUnderConstruction() :
-        "INode should correspond to a file under construction";
-      INodeFileUnderConstruction fileINode = 
-        (INodeFileUnderConstruction)inodes[inodes.length-1];
+      final INode[] inodes = inodesInPath.getINodes();
+      final INodeFileUnderConstruction fileINode = 
+          INodeFileUnderConstruction.valueOf(inodes[inodes.length-1], path);
 
       // check quota limits and updated space consumed
-      updateCount(inodes, inodes.length-1, 0,
+      updateCount(inodesInPath, inodes.length-1, 0,
           fileINode.getPreferredBlockSize()*fileINode.getBlockReplication(), true);
 
       // associate new last block for the file
@@ -443,8 +438,9 @@ public class FSDirectory implements Clos
     }
 
     // update space consumed
-    INode[] pathINodes = getExistingPathINodes(path);
-    updateCount(pathINodes, pathINodes.length-1, 0,
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, true);
+    final INode[] inodes = inodesInPath.getINodes();
+    updateCount(inodesInPath, inodes.length-1, 0,
         -fileNode.getPreferredBlockSize()*fileNode.getBlockReplication(), true);
   }
 
@@ -512,7 +508,8 @@ public class FSDirectory implements Clos
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
     assert hasWriteLock();
-    INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    INode[] srcInodes = srcInodesInPath.getINodes();
     INode srcInode = srcInodes[srcInodes.length-1];
     
     // check the validation of the source
@@ -535,7 +532,7 @@ public class FSDirectory implements Clos
     if (dst.equals(src)) {
       return true;
     }
-    if (srcInode.isLink() && 
+    if (srcInode.isSymlink() && 
         dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
       throw new FileAlreadyExistsException(
           "Cannot rename symlink "+src+" to its target "+dst);
@@ -551,8 +548,9 @@ public class FSDirectory implements Clos
     }
     
     byte[][] dstComponents = INode.getPathComponents(dst);
-    INode[] dstInodes = new INode[dstComponents.length];
-    rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+    INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
+        dstComponents.length, false);
+    INode[] dstInodes = dstInodesInPath.getINodes();
     if (dstInodes[dstInodes.length-1] != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                    +"failed to rename "+src+" to "+dst+ 
@@ -567,14 +565,14 @@ public class FSDirectory implements Clos
     }
     
     // Ensure dst has quota to accommodate rename
-    verifyQuotaForRename(srcInodes,dstInodes);
+    verifyQuotaForRename(srcInodes, dstInodes);
     
     INode dstChild = null;
     INode srcChild = null;
     String srcChildName = null;
     try {
       // remove src
-      srcChild = removeChild(srcInodes, srcInodes.length-1);
+      srcChild = removeChild(srcInodesInPath, srcInodes.length-1);
       if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
@@ -585,7 +583,7 @@ public class FSDirectory implements Clos
       srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
       // add src to the destination
-      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length-1,
           srcChild, UNKNOWN_DISK_SPACE);
       if (dstChild != null) {
         srcChild = null;
@@ -602,7 +600,7 @@ public class FSDirectory implements Clos
       if (dstChild == null && srcChild != null) {
         // put it back
         srcChild.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
+        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, srcChild, 
             UNKNOWN_DISK_SPACE);
       }
     }
@@ -635,7 +633,8 @@ public class FSDirectory implements Clos
       }
     }
     String error = null;
-    final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    final INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
+    final INode[] srcInodes = srcInodesInPath.getINodes();
     final INode srcInode = srcInodes[srcInodes.length - 1];
     // validate source
     if (srcInode == null) {
@@ -656,7 +655,7 @@ public class FSDirectory implements Clos
       throw new FileAlreadyExistsException(
           "The source "+src+" and destination "+dst+" are the same");
     }
-    if (srcInode.isLink() && 
+    if (srcInode.isSymlink() && 
         dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
       throw new FileAlreadyExistsException(
           "Cannot rename symlink "+src+" to its target "+dst);
@@ -671,8 +670,9 @@ public class FSDirectory implements Clos
       throw new IOException(error);
     }
     final byte[][] dstComponents = INode.getPathComponents(dst);
-    final INode[] dstInodes = new INode[dstComponents.length];
-    rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+    INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
+        dstComponents.length, false);
+    final INode[] dstInodes = dstInodesInPath.getINodes();
     INode dstInode = dstInodes[dstInodes.length - 1];
     if (dstInodes.length == 1) {
       error = "rename destination cannot be the root";
@@ -696,7 +696,7 @@ public class FSDirectory implements Clos
         throw new FileAlreadyExistsException(error);
       }
       List<INode> children = dstInode.isDirectory() ? 
-          ((INodeDirectory) dstInode).getChildrenRaw() : null;
+          ((INodeDirectory) dstInode).getChildren() : null;
       if (children != null && children.size() != 0) {
         error = "rename cannot overwrite non empty destination directory "
             + dst;
@@ -720,7 +720,7 @@ public class FSDirectory implements Clos
 
     // Ensure dst has quota to accommodate rename
     verifyQuotaForRename(srcInodes, dstInodes);
-    INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+    INode removedSrc = removeChild(srcInodesInPath, srcInodes.length - 1);
     if (removedSrc == null) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
@@ -733,14 +733,14 @@ public class FSDirectory implements Clos
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeChild(dstInodes, dstInodes.length - 1);
+        removedDst = removeChild(dstInodesInPath, dstInodes.length - 1);
         dstChildName = removedDst.getLocalName();
       }
 
       INode dstChild = null;
       removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
       // add src as dst to complete rename
-      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1,
           removedSrc, UNKNOWN_DISK_SPACE);
 
       int filesDeleted = 0;
@@ -758,7 +758,7 @@ public class FSDirectory implements Clos
         if (removedDst != null) {
           INode rmdst = removedDst;
           removedDst = null;
-          List<Block> collectedBlocks = new ArrayList<Block>();
+          BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
           filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
         }
@@ -768,13 +768,13 @@ public class FSDirectory implements Clos
       if (removedSrc != null) {
         // Rename failed - restore src
         removedSrc.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
+        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, removedSrc, 
             UNKNOWN_DISK_SPACE);
       }
       if (removedDst != null) {
         // Rename failed - restore dst
         removedDst.setLocalName(dstChildName);
-        addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
+        addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, removedDst, 
             UNKNOWN_DISK_SPACE);
       }
     }
@@ -814,12 +814,13 @@ public class FSDirectory implements Clos
                                     UnresolvedLinkException {
     assert hasWriteLock();
 
-    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     INode inode = inodes[inodes.length - 1];
     if (inode == null) {
       return null;
     }
-    assert !inode.isLink();
+    assert !inode.isSymlink();
     if (inode.isDirectory()) {
       return null;
     }
@@ -828,7 +829,7 @@ public class FSDirectory implements Clos
 
     // check disk quota
     long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
-    updateCount(inodes, inodes.length-1, 0, dsDelta, true);
+    updateCount(inodesInPath, inodes.length-1, 0, dsDelta, true);
 
     fileNode.setReplication(replication);
 
@@ -851,7 +852,7 @@ public class FSDirectory implements Clos
       if (inode == null) {
         throw new FileNotFoundException("File does not exist: " + filename);
       }
-      if (inode.isDirectory() || inode.isLink()) {
+      if (inode.isDirectory() || inode.isSymlink()) {
         throw new IOException("Getting block size of non-file: "+ filename); 
       }
       return ((INodeFile)inode).getPreferredBlockSize();
@@ -868,7 +869,7 @@ public class FSDirectory implements Clos
       if (inode == null) {
          return false;
       }
-      return inode.isDirectory() || inode.isLink() 
+      return inode.isDirectory() || inode.isSymlink() 
         ? true 
         : ((INodeFile)inode).getBlocks() != null;
     } finally {
@@ -958,7 +959,8 @@ public class FSDirectory implements Clos
     }
     // do the move
     
-    INode [] trgINodes =  getExistingPathINodes(target);
+    final INodesInPath trgINodesInPath = rootDir.getExistingPathINodes(target, true);
+    final INode[] trgINodes = trgINodesInPath.getINodes();
     INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
     INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
     
@@ -966,9 +968,9 @@ public class FSDirectory implements Clos
     int i = 0;
     int totalBlocks = 0;
     for(String src : srcs) {
-      INodeFile srcInode = getFileINode(src);
+      INodeFile srcInode = (INodeFile)getINode(src);
       allSrcInodes[i++] = srcInode;
-      totalBlocks += srcInode.blocks.length;  
+      totalBlocks += srcInode.numBlocks();  
     }
     trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
     
@@ -977,7 +979,7 @@ public class FSDirectory implements Clos
     for(INodeFile nodeToRemove: allSrcInodes) {
       if(nodeToRemove == null) continue;
       
-      nodeToRemove.blocks = null;
+      nodeToRemove.setBlocks(null);
       trgParent.removeChild(nodeToRemove);
       count++;
     }
@@ -985,7 +987,7 @@ public class FSDirectory implements Clos
     trgInode.setModificationTimeForce(timestamp);
     trgParent.setModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
-    unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+    unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
   }
 
   /**
@@ -995,7 +997,7 @@ public class FSDirectory implements Clos
    * @param collectedBlocks Blocks under the deleted directory
    * @return true on successful deletion; else false
    */
-  boolean delete(String src, List<Block>collectedBlocks) 
+  boolean delete(String src, BlocksMapUpdateInfo collectedBlocks) 
     throws UnresolvedLinkException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
@@ -1019,35 +1021,21 @@ public class FSDirectory implements Clos
     return true;
   }
   
-  /** Return if a directory is empty or not **/
-  boolean isDirEmpty(String src) throws UnresolvedLinkException {
-    boolean dirNotEmpty = true;
-    if (!isDir(src)) {
-      return true;
-    }
+  /**
+   * @return true if the path is a non-empty directory; otherwise, return false.
+   */
+  boolean isNonEmptyDirectory(String path) throws UnresolvedLinkException {
     readLock();
     try {
-      INode targetNode = rootDir.getNode(src, false);
-      assert targetNode != null : "should be taken care in isDir() above";
-      if (((INodeDirectory)targetNode).getChildren().size() != 0) {
-        dirNotEmpty = false;
+      final INode inode = rootDir.getNode(path, false);
+      if (inode == null || !inode.isDirectory()) {
+        //not found or not a directory
+        return false;
       }
+      return ((INodeDirectory)inode).getChildrenList().size() != 0;
     } finally {
       readUnlock();
     }
-    return dirNotEmpty;
-  }
-
-  boolean isEmpty() {
-    try {
-      return isDirEmpty("/");
-    } catch (UnresolvedLinkException e) {
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("/ cannot be a symlink");
-      }
-      assert false : "/ cannot be a symlink";
-      return true;
-    }
   }
 
   /**
@@ -1062,7 +1050,7 @@ public class FSDirectory implements Clos
   void unprotectedDelete(String src, long mtime) 
     throws UnresolvedLinkException {
     assert hasWriteLock();
-    List<Block> collectedBlocks = new ArrayList<Block>();
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     int filesRemoved = unprotectedDelete(src, collectedBlocks, mtime);
     if (filesRemoved > 0) {
       getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
@@ -1077,12 +1065,13 @@ public class FSDirectory implements Clos
    * @param mtime the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */ 
-  int unprotectedDelete(String src, List<Block> collectedBlocks, 
+  int unprotectedDelete(String src, BlocksMapUpdateInfo collectedBlocks, 
       long mtime) throws UnresolvedLinkException {
     assert hasWriteLock();
     src = normalizePath(src);
 
-    INode[] inodes =  rootDir.getExistingPathINodes(src, false);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, false);
+    final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
 
     if (targetNode == null) { // non-existent src
@@ -1100,7 +1089,7 @@ public class FSDirectory implements Clos
     }
     int pos = inodes.length - 1;
     // Remove the node from the namespace
-    targetNode = removeChild(inodes, pos);
+    targetNode = removeChild(inodesInPath, pos);
     if (targetNode == null) {
       return 0;
     }
@@ -1171,7 +1160,7 @@ public class FSDirectory implements Clos
                 targetNode, needLocation)}, 0);
       }
       INodeDirectory dirInode = (INodeDirectory)targetNode;
-      List<INode> contents = dirInode.getChildren();
+      List<INode> contents = dirInode.getChildrenList();
       int startChild = dirInode.nextChild(startAfter);
       int totalNumChildren = contents.size();
       int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
@@ -1222,7 +1211,7 @@ public class FSDirectory implements Clos
         return null;
       if (targetNode.isDirectory())
         return null;
-      if (targetNode.isLink()) 
+      if (targetNode.isSymlink()) 
         return null;
       return ((INodeFile)targetNode).getBlocks();
     } finally {
@@ -1231,46 +1220,12 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Get {@link INode} associated with the file.
-   */
-  INodeFile getFileINode(String src) throws UnresolvedLinkException {
-    INode inode = getINode(src);
-    if (inode == null || inode.isDirectory())
-      return null;
-    assert !inode.isLink();
-    return (INodeFile) inode;
-  }
-  
-  /**
    * Get {@link INode} associated with the file / directory.
    */
   INode getINode(String src) throws UnresolvedLinkException {
     readLock();
     try {
-      INode iNode = rootDir.getNode(src, true);
-      return iNode;
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Retrieve the existing INodes along the given path.
-   * 
-   * @param path the path to explore
-   * @return INodes array containing the existing INodes in the order they
-   *         appear when following the path from the root INode to the
-   *         deepest INodes. The array size will be the number of expected
-   *         components in the path, and non existing components will be
-   *         filled with null
-   *         
-   * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
-   */
-  INode[] getExistingPathINodes(String path) 
-    throws UnresolvedLinkException {
-    readLock();
-    try {
-      return rootDir.getExistingPathINodes(path, true);
+      return rootDir.getNode(src, true);
     } finally {
       readUnlock();
     }
@@ -1340,13 +1295,14 @@ public class FSDirectory implements Clos
                                                 UnresolvedLinkException {
     writeLock();
     try {
-      INode[] inodes = rootDir.getExistingPathINodes(path, false);
+      final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
+      final INode[] inodes = inodesInPath.getINodes();
       int len = inodes.length;
       if (inodes[len - 1] == null) {
         throw new FileNotFoundException(path + 
                                         " does not exist under rootDir.");
       }
-      updateCount(inodes, len-1, nsDelta, dsDelta, true);
+      updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
     } finally {
       writeUnlock();
     }
@@ -1361,7 +1317,7 @@ public class FSDirectory implements Clos
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private void updateCount(INode[] inodes, int numOfINodes, 
+  private void updateCount(INodesInPath inodesInPath, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
@@ -1369,29 +1325,25 @@ public class FSDirectory implements Clos
       //still initializing. do not check or update quotas.
       return;
     }
-    if (numOfINodes>inodes.length) {
+    final INode[] inodes = inodesInPath.getINodes();
+    if (numOfINodes > inodes.length) {
       numOfINodes = inodes.length;
     }
     if (checkQuota) {
       verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
     }
-    for(int i = 0; i < numOfINodes; i++) {
-      if (inodes[i].isQuotaSet()) { // a directory with quota
-        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-        node.updateNumItemsInTree(nsDelta, dsDelta);
-      }
-    }
+    unprotectedUpdateCount(inodesInPath, numOfINodes, nsDelta, dsDelta);
   }
   
   /** 
    * update quota of each inode and check to see if quota is exceeded. 
    * See {@link #updateCount(INode[], int, long, long, boolean)}
    */ 
-  private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
-                           long nsDelta, long dsDelta) {
+  private void updateCountNoQuotaCheck(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
     assert hasWriteLock();
     try {
-      updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
+      updateCount(inodesInPath, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
     }
@@ -1405,9 +1357,10 @@ public class FSDirectory implements Clos
    * @param nsDelta
    * @param dsDelta
    */
-   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
-                                      long nsDelta, long dsDelta) {
-     assert hasWriteLock();
+  private void unprotectedUpdateCount(INodesInPath inodesInPath,
+      int numOfINodes, long nsDelta, long dsDelta) {
+    assert hasWriteLock();
+    final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
         INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
@@ -1472,18 +1425,19 @@ public class FSDirectory implements Clos
     src = normalizePath(src);
     String[] names = INode.getPathNames(src);
     byte[][] components = INode.getPathComponents(names);
-    INode[] inodes = new INode[components.length];
-    final int lastInodeIndex = inodes.length - 1;
+    final int lastInodeIndex = components.length - 1;
 
     writeLock();
     try {
-      rootDir.getExistingPathINodes(components, inodes, false);
+      INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
+          components.length, false);
+      INode[] inodes = inodesInPath.getINodes();
 
       // find the index of the first null in inodes[]
       StringBuilder pathbuilder = new StringBuilder();
       int i = 1;
       for(; i < inodes.length && inodes[i] != null; i++) {
-        pathbuilder.append(Path.SEPARATOR + names[i]);
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
         if (!inodes[i].isDirectory()) {
           throw new FileAlreadyExistsException("Parent path is not a directory: "
               + pathbuilder+ " "+inodes[i].getLocalName());
@@ -1525,8 +1479,7 @@ public class FSDirectory implements Clos
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
-        String cur = pathbuilder.toString();
-        unprotectedMkdir(inodes, i, components[i],
+        unprotectedMkdir(inodesInPath, i, components[i],
             (i < lastInodeIndex) ? parentPermissions : permissions, now);
         if (inodes[i] == null) {
           return false;
@@ -1535,6 +1488,8 @@ public class FSDirectory implements Clos
         // to match count of FilesDeleted metric.
         if (getFSNamesystem() != null)
           NameNode.getNameNodeMetrics().incrFilesCreated();
+
+        final String cur = pathbuilder.toString();
         fsImage.getEditLog().logMkDir(cur, inodes[i]);
         if(NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -1547,49 +1502,48 @@ public class FSDirectory implements Clos
     return true;
   }
 
-  /**
-   */
   INode unprotectedMkdir(String src, PermissionStatus permissions,
                           long timestamp) throws QuotaExceededException,
                           UnresolvedLinkException {
     assert hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
-    INode[] inodes = new INode[components.length];
-
-    rootDir.getExistingPathINodes(components, inodes, false);
-    unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
-        permissions, timestamp);
-    return inodes[inodes.length-1];
+    INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
+        components.length, false);
+    INode[] inodes = inodesInPath.getINodes();
+    final int pos = inodes.length - 1;
+    unprotectedMkdir(inodesInPath, pos, components[pos], permissions, timestamp);
+    return inodes[pos];
   }
 
   /** create a directory at index pos.
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private void unprotectedMkdir(INode[] inodes, int pos,
+  private void unprotectedMkdir(INodesInPath inodesInPath, int pos,
       byte[] name, PermissionStatus permission,
       long timestamp) throws QuotaExceededException {
     assert hasWriteLock();
-    inodes[pos] = addChild(inodes, pos, 
-        new INodeDirectory(name, permission, timestamp),
-        -1);
+    final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
+    final INode inode = addChild(inodesInPath, pos, dir, -1, true);
+    inodesInPath.setINode(pos, inode);
   }
   
   /** Add a node child to the namespace. The full path name of the node is src.
    * childDiskspace should be -1, if unknown. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addNode(String src, T child, 
-        long childDiskspace) 
-  throws QuotaExceededException, UnresolvedLinkException {
+   * @throw QuotaExceededException is thrown if it violates quota limit
+   */
+  private <T extends INode> T addNode(String src, T child, long childDiskspace
+      ) throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     byte[] path = components[components.length-1];
     child.setLocalName(path);
     cacheName(child);
-    INode[] inodes = new INode[components.length];
     writeLock();
     try {
-      rootDir.getExistingPathINodes(components, inodes, false);
-      return addChild(inodes, inodes.length-1, child, childDiskspace);
+      INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
+          components.length, false);
+      return addChild(inodesInPath, inodesInPath.getINodes().length-1, child,
+          childDiskspace, true);
     } finally {
       writeUnlock();
     }
@@ -1695,7 +1649,7 @@ public class FSDirectory implements Clos
       }
       if (maxDirItems != 0) {
         INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
-        int count = parent.getChildren().size();
+        int count = parent.getChildrenList().size();
         if (count >= maxDirItems) {
           throw new MaxDirectoryItemsExceededException(maxDirItems, count);
         }
@@ -1714,19 +1668,22 @@ public class FSDirectory implements Clos
   }
   
   /** Add a node child to the inodes at index pos. 
-   * Its ancestors are stored at [0, pos-1]. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+   * Its ancestors are stored at [0, pos-1].
+   * @return the added node. 
+   * @throw QuotaExceededException is thrown if it violates quota limit
+   */
+  private <T extends INode> T addChild(INodesInPath inodesInPath, int pos,
       T child, long childDiskspace,
       boolean checkQuota) throws QuotaExceededException {
-	// The filesystem limits are not really quotas, so this check may appear
-	// odd.  It's because a rename operation deletes the src, tries to add
-	// to the dest, if that fails, re-adds the src from whence it came.
-	// The rename code disables the quota when it's restoring to the
-	// original location becase a quota violation would cause the the item
-	// to go "poof".  The fs limits must be bypassed for the same reason.
+    final INode[] inodes = inodesInPath.getINodes();
+    // The filesystem limits are not really quotas, so this check may appear
+    // odd. It's because a rename operation deletes the src, tries to add
+    // to the dest, if that fails, re-adds the src from whence it came.
+    // The rename code disables the quota when it's restoring to the
+    // original location becase a quota violation would cause the the item
+    // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      verifyFsLimits(pathComponents, pos, child);
+      verifyFsLimits(inodes, pos, child);
     }
     
     INode.DirCounts counts = new INode.DirCounts();
@@ -1734,31 +1691,22 @@ public class FSDirectory implements Clos
     if (childDiskspace < 0) {
       childDiskspace = counts.getDsCount();
     }
-    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
-        checkQuota);
-    if (pathComponents[pos-1] == null) {
+    updateCount(inodesInPath, pos, counts.getNsCount(), childDiskspace, checkQuota);
+    if (inodes[pos-1] == null) {
       throw new NullPointerException("Panic: parent does not exist");
     }
-    T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
-        child, true);
+    final T addedNode = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, -counts.getNsCount(), 
-          -childDiskspace, true);
+      updateCount(inodesInPath, pos, -counts.getNsCount(), -childDiskspace, true);
     }
     return addedNode;
   }
-
-  private <T extends INode> T addChild(INode[] pathComponents, int pos,
-      T child, long childDiskspace)
-      throws QuotaExceededException {
-    return addChild(pathComponents, pos, child, childDiskspace, true);
-  }
   
-  private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
+  private <T extends INode> T addChildNoQuotaCheck(INodesInPath inodesInPath,
       int pos, T child, long childDiskspace) {
     T inode = null;
     try {
-      inode = addChild(pathComponents, pos, child, childDiskspace, false);
+      inode = addChild(inodesInPath, pos, child, childDiskspace, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
     }
@@ -1770,13 +1718,13 @@ public class FSDirectory implements Clos
    * Count of each ancestor with quota is also updated.
    * Return the removed node; null if the removal fails.
    */
-  private INode removeChild(INode[] pathComponents, int pos) {
-    INode removedNode = 
-      ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
+  private INode removeChild(final INodesInPath inodesInPath, int pos) {
+    final INode[] inodes = inodesInPath.getINodes();
+    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
-      updateCountNoQuotaCheck(pathComponents, pos,
+      updateCountNoQuotaCheck(inodesInPath, pos,
                   -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;
@@ -1844,11 +1792,11 @@ public class FSDirectory implements Clos
      * INode. using 'parent' is not currently recommended. */
     nodesInPath.add(dir);
 
-    for (INode child : dir.getChildren()) {
+    for (INode child : dir.getChildrenList()) {
       if (child.isDirectory()) {
         updateCountForINodeWithQuota((INodeDirectory)child, 
                                      counts, nodesInPath);
-      } else if (child.isLink()) {
+      } else if (child.isSymlink()) {
         counts.nsCount += 1;
       } else { // reduce recursive calls
         counts.nsCount += 1;
@@ -1911,7 +1859,8 @@ public class FSDirectory implements Clos
     
     String srcs = normalizePath(src);
 
-    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
+    final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
     if (targetNode == null) {
       throw new FileNotFoundException("Directory does not exist: " + srcs);
@@ -2077,7 +2026,7 @@ public class FSDirectory implements Clos
         node.getFsPermission(),
         node.getUserName(),
         node.getGroupName(),
-        node.isLink() ? ((INodeSymlink)node).getSymlink() : null,
+        node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
         path);
   }
 
@@ -2113,7 +2062,7 @@ public class FSDirectory implements Clos
           node.getFsPermission(),
           node.getUserName(),
           node.getGroupName(),
-          node.isLink() ? ((INodeSymlink)node).getSymlink() : null,
+          node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
           path,
           loc);
       }
@@ -2145,16 +2094,13 @@ public class FSDirectory implements Clos
       writeUnlock();
     }
     if (newNode == null) {
-      NameNode.stateChangeLog.info("DIR* FSDirectory.addSymlink: "
-                                   +"failed to add "+path
-                                   +" to the file system");
+      NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
       return null;
     }
     fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
     
     if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.addSymlink: "
-          +path+" is added to the file system");
+      NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");
     }
     return newNode;
   }
@@ -2187,7 +2133,7 @@ public class FSDirectory implements Clos
    */
   void cacheName(INode inode) {
     // Name is cached only for files
-    if (inode.isDirectory() || inode.isLink()) {
+    if (inode.isDirectory() || inode.isSymlink()) {
       return;
     }
     ByteArray name = new ByteArray(inode.getLocalNameBytes());

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Nov  8 19:09:46 2012
@@ -900,7 +900,7 @@ public class FSEditLog implements LogsPu
    * in the new log.
    */
   synchronized long rollEditLog() throws IOException {
-    LOG.info("Rolling edit logs.");
+    LOG.info("Rolling edit logs");
     endCurrentLogSegment(true);
     
     long nextTxId = getLastWrittenTxId() + 1;
@@ -915,7 +915,7 @@ public class FSEditLog implements LogsPu
    */
   public synchronized void startLogSegment(long txid, 
       boolean abortCurrentLogSegment) throws IOException {
-    LOG.info("Namenode started a new log segment at txid " + txid);
+    LOG.info("Started a new log segment at txid " + txid);
     if (isSegmentOpen()) {
       if (getLastWrittenTxId() == txid - 1) {
         //In sync with the NN, so end and finalize the current segment`

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov  8 19:09:46 2012
@@ -90,7 +90,7 @@ public class FSEditLogLoader {
                                  expectedStartingTxId, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
-          + " loaded in " + (now()-startTime)/1000 + " seconds.");
+          + " loaded in " + (now()-startTime)/1000 + " seconds");
       return numEdits;
     } finally {
       edits.close();
@@ -477,8 +477,8 @@ public class FSEditLogLoader {
       Lease lease = fsNamesys.leaseManager.getLease(
           reassignLeaseOp.leaseHolder);
       INodeFileUnderConstruction pendingFile =
-          (INodeFileUnderConstruction) fsDir.getFileINode(
-              reassignLeaseOp.path);
+          INodeFileUnderConstruction.valueOf( 
+              fsDir.getINode(reassignLeaseOp.path), reassignLeaseOp.path);
       fsNamesys.reassignLeaseInternal(lease,
           reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
       break;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu Nov  8 19:09:46 2012
@@ -246,10 +246,8 @@ class FSImageFormat {
    private int loadDirectory(DataInputStream in) throws IOException {
      String parentPath = FSImageSerialization.readString(in);
      FSDirectory fsDir = namesystem.dir;
-     INode parent = fsDir.rootDir.getNode(parentPath, true);
-     if (parent == null || !parent.isDirectory()) {
-       throw new IOException("Path " + parentPath + "is not a directory.");
-     }
+     final INodeDirectory parent = INodeDirectory.valueOf(
+         fsDir.rootDir.getNode(parentPath, true), parentPath);
 
      int numChildren = in.readInt();
      for(int i=0; i<numChildren; i++) {
@@ -259,7 +257,7 @@ class FSImageFormat {
        INode newNode = loadINode(in); // read rest of inode
 
        // add to parent
-       namesystem.dir.addToParent(localName, (INodeDirectory)parent, newNode, false);
+       namesystem.dir.addToParent(localName, parent, newNode, false);
      }
      return numChildren;
    }
@@ -365,14 +363,7 @@ class FSImageFormat {
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
-        INode old = fsDir.getFileINode(path);
-        if (old == null) {
-          throw new IOException("Found lease for non-existent file " + path);
-        }
-        if (old.isDirectory()) {
-          throw new IOException("Found lease for directory " + path);
-        }
-        INodeFile oldnode = (INodeFile) old;
+        INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path);
         fsDir.replaceNode(path, oldnode, cons);
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
@@ -539,7 +530,7 @@ class FSImageFormat {
     private void saveImage(ByteBuffer currentDirName,
                                   INodeDirectory current,
                                   DataOutputStream out) throws IOException {
-      List<INode> children = current.getChildrenRaw();
+      List<INode> children = current.getChildren();
       if (children == null || children.isEmpty())
         return;
       // print prefix (parent directory name)

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Thu Nov  8 19:09:46 2012
@@ -162,7 +162,7 @@ public class FSImageSerialization {
       PermissionStatus.write(out, node.getUserName(),
                              node.getGroupName(),
                              filePerm);
-    } else if (node.isLink()) {
+    } else if (node.isSymlink()) {
       out.writeShort(0);  // replication
       out.writeLong(0);   // modification time
       out.writeLong(0);   // access time

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Nov  8 19:09:46 2012
@@ -17,20 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -159,6 +159,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
@@ -656,11 +658,11 @@ public class FSNamesystem implements Nam
         editLog.recoverUnclosedStreams();
         
         LOG.info("Catching up to latest edits from old active before " +
-            "taking over writer role in edits logs.");
+            "taking over writer role in edits logs");
         editLogTailer.catchupDuringFailover();
         blockManager.setPostponeBlocksFromFuture(false);
         
-        LOG.info("Reprocessing replication and invalidation queues...");
+        LOG.info("Reprocessing replication and invalidation queues");
         blockManager.getDatanodeManager().markAllDatanodesStale();
         blockManager.clearQueues();
         blockManager.processAllPendingDNMessages();
@@ -1266,11 +1268,7 @@ public class FSNamesystem implements Nam
         }
 
         long now = now();
-        INodeFile inode = dir.getFileINode(src);
-        if (inode == null) {
-          throw new FileNotFoundException("File does not exist: " + src);
-        }
-        assert !inode.isLink();
+        final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
         if (doAccessTime && isAccessTimeSupported()) {
           if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
             // if we have to set access time but we only have the readlock, then
@@ -1386,28 +1384,27 @@ public class FSNamesystem implements Nam
 
     // we put the following prerequisite for the operation
     // replication and blocks sizes should be the same for ALL the blocks
-    // check the target
-    INode inode = dir.getFileINode(target);
-
-    if(inode == null) {
-      throw new IllegalArgumentException("concat: trg file doesn't exist");
-    }
-    if(inode.isUnderConstruction()) {
-      throw new IllegalArgumentException("concat: trg file is uner construction");
-    }
-
-    INodeFile trgInode = (INodeFile) inode;
 
-    // per design trg shouldn't be empty and all the blocks same size
-    if(trgInode.blocks.length == 0) {
-      throw new IllegalArgumentException("concat: "+ target + " file is empty");
+    // check the target
+    final INodeFile trgInode = INodeFile.valueOf(dir.getINode(target), target);
+    if(trgInode.isUnderConstruction()) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is under construction");
+    }
+    // per design target shouldn't be empty and all the blocks same size
+    if(trgInode.numBlocks() == 0) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is empty");
     }
 
     long blockSize = trgInode.getPreferredBlockSize();
 
     // check the end block to be full
-    if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
-      throw new IllegalArgumentException(target + " blocks size should be the same");
+    final BlockInfo last = trgInode.getLastBlock();
+    if(blockSize != last.getNumBytes()) {
+      throw new HadoopIllegalArgumentException("The last block in " + target
+          + " is not full; last block size = " + last.getNumBytes()
+          + " but file block size = " + blockSize);
     }
 
     si.add(trgInode);
@@ -1420,32 +1417,36 @@ public class FSNamesystem implements Nam
       if(i==srcs.length-1)
         endSrc=true;
 
-      INodeFile srcInode = dir.getFileINode(src);
-
+      final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src);
       if(src.isEmpty() 
-          || srcInode == null
           || srcInode.isUnderConstruction()
-          || srcInode.blocks.length == 0) {
-        throw new IllegalArgumentException("concat: file " + src + 
-        " is invalid or empty or underConstruction");
+          || srcInode.numBlocks() == 0) {
+        throw new HadoopIllegalArgumentException("concat: source file " + src
+            + " is invalid or empty or underConstruction");
       }
 
       // check replication and blocks size
       if(repl != srcInode.getBlockReplication()) {
-        throw new IllegalArgumentException(src + " and " + target + " " +
-            "should have same replication: "
-            + repl + " vs. " + srcInode.getBlockReplication());
+        throw new HadoopIllegalArgumentException("concat: the soruce file "
+            + src + " and the target file " + target
+            + " should have the same replication: source replication is "
+            + srcInode.getBlockReplication()
+            + " but target replication is " + repl);
       }
 
       //boolean endBlock=false;
       // verify that all the blocks are of the same length as target
       // should be enough to check the end blocks
-      int idx = srcInode.blocks.length-1;
+      final BlockInfo[] srcBlocks = srcInode.getBlocks();
+      int idx = srcBlocks.length-1;
       if(endSrc)
-        idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
-      if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
-        throw new IllegalArgumentException("concat: blocks sizes of " + 
-            src + " and " + target + " should all be the same");
+        idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
+      if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
+        throw new HadoopIllegalArgumentException("concat: the soruce file "
+            + src + " and the target file " + target
+            + " should have the same blocks sizes: target block size is "
+            + blockSize + " but the size of source block " + idx + " is "
+            + srcBlocks[idx].getNumBytes());
       }
 
       si.add(srcInode);
@@ -1454,7 +1455,8 @@ public class FSNamesystem implements Nam
     // make sure no two files are the same
     if(si.size() < srcs.length+1) { // trg + srcs
       // it means at least two files are the same
-      throw new IllegalArgumentException("at least two files are the same");
+      throw new HadoopIllegalArgumentException(
+          "concat: at least two of the source files are the same");
     }
 
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1666,7 +1668,7 @@ public class FSNamesystem implements Nam
     }
   }
 
-  /*
+  /**
    * Verify that parent directory of src exists.
    */
   private void verifyParentDir(String src) throws FileNotFoundException,
@@ -1674,14 +1676,13 @@ public class FSNamesystem implements Nam
     assert hasReadOrWriteLock();
     Path parent = new Path(src).getParent();
     if (parent != null) {
-      INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
-      INode parentNode = pathINodes[pathINodes.length - 1];
+      final INode parentNode = dir.getINode(parent.toString());
       if (parentNode == null) {
         throw new FileNotFoundException("Parent directory doesn't exist: "
-            + parent.toString());
-      } else if (!parentNode.isDirectory() && !parentNode.isLink()) {
+            + parent);
+      } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
         throw new ParentNotDirectoryException("Parent path is not a directory: "
-            + parent.toString());
+            + parent);
       }
     }
   }
@@ -1793,13 +1794,9 @@ public class FSNamesystem implements Nam
     }
 
     try {
-      INodeFile myFile = dir.getFileINode(src);
-      try {
-        blockManager.verifyReplication(src, replication, clientMachine);
-      } catch(IOException e) {
-        throw new IOException("failed to create "+e.getMessage());
-      }
+      blockManager.verifyReplication(src, replication, clientMachine);
       boolean create = flag.contains(CreateFlag.CREATE);
+      final INode myFile = dir.getINode(src);
       if (myFile == null) {
         if (!create) {
           throw new FileNotFoundException("failed to overwrite or append to non-existent file "
@@ -1825,8 +1822,9 @@ public class FSNamesystem implements Nam
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
 
       if (append && myFile != null) {
+        final INodeFile f = INodeFile.valueOf(myFile, src); 
         return prepareFileForWrite(
-            src, myFile, holder, clientMachine, clientNode, true);
+            src, f, holder, clientMachine, clientNode, true);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1920,11 +1918,7 @@ public class FSNamesystem implements Nam
         throw new IOException("Invalid file name: " + src);
       }
   
-      INode inode = dir.getFileINode(src);
-      if (inode == null) {
-        throw new FileNotFoundException("File not found " + src);
-      }
-  
+      final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
       if (!inode.isUnderConstruction()) {
         return true;
       }
@@ -1977,7 +1971,7 @@ public class FSNamesystem implements Nam
       if (force) {
         // close now: no need to wait for soft lease expiration and 
         // close only the file src
-        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
+        LOG.info("recoverLease: " + lease + ", src=" + src +
           " from client " + pendingFile.getClientName());
         internalReleaseLease(lease, src, holder);
       } else {
@@ -1989,8 +1983,8 @@ public class FSNamesystem implements Nam
         // period, then start lease recovery.
         //
         if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: recover lease " + lease + ", src=" + src +
-              " from client " + pendingFile.getClientName());
+          LOG.info("startFile: recover " + lease + ", src=" + src + " client "
+              + pendingFile.getClientName());
           boolean isClosed = internalReleaseLease(lease, src, null);
           if(!isClosed)
             throw new RecoveryInProgressException(
@@ -2166,7 +2160,7 @@ public class FSNamesystem implements Nam
           }
 
           // The retry case ("b" above) -- abandon the old block.
-          NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +
+          NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
               "caught retry for allocation of a new block in " +
               src + ". Abandoning old block " + lastBlockInFile);
           dir.removeBlock(src, pendingFile, lastBlockInFile);
@@ -2207,18 +2201,18 @@ public class FSNamesystem implements Nam
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
-      INode[] pathINodes = dir.getExistingPathINodes(src);
-      int inodesLen = pathINodes.length;
-      checkLease(src, clientName, pathINodes[inodesLen-1]);
-      INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
-                                                pathINodes[inodesLen - 1];
+
+      final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
+      final INode[] inodes = inodesInPath.getINodes();
+      final INodeFileUnderConstruction pendingFile
+          = checkLease(src, clientName, inodes[inodes.length - 1]);
                                                            
       if (!checkFileProgress(pendingFile, false)) {
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pathINodes, targets);
+      newBlock = allocateBlock(src, inodesInPath, targets);
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
@@ -2325,35 +2319,32 @@ public class FSNamesystem implements Nam
   private INodeFileUnderConstruction checkLease(String src, String holder) 
       throws LeaseExpiredException, UnresolvedLinkException {
     assert hasReadOrWriteLock();
-    INodeFile file = dir.getFileINode(src);
-    checkLease(src, holder, file);
-    return (INodeFileUnderConstruction)file;
+    return checkLease(src, holder, dir.getINode(src));
   }
 
-  private void checkLease(String src, String holder, INode file)
-      throws LeaseExpiredException {
+  private INodeFileUnderConstruction checkLease(String src, String holder,
+      INode file) throws LeaseExpiredException {
     assert hasReadOrWriteLock();
-    if (file == null || file.isDirectory()) {
+    if (file == null || !(file instanceof INodeFile)) {
       Lease lease = leaseManager.getLease(holder);
-      throw new LeaseExpiredException("No lease on " + src +
-                                      " File does not exist. " +
-                                      (lease != null ? lease.toString() :
-                                       "Holder " + holder + 
-                                       " does not have any open files."));
+      throw new LeaseExpiredException(
+          "No lease on " + src + ": File does not exist. "
+          + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
     }
     if (!file.isUnderConstruction()) {
       Lease lease = leaseManager.getLease(holder);
-      throw new LeaseExpiredException("No lease on " + src + 
-                                      " File is not open for writing. " +
-                                      (lease != null ? lease.toString() :
-                                       "Holder " + holder + 
-                                       " does not have any open files."));
+      throw new LeaseExpiredException(
+          "No lease on " + src + ": File is not open for writing. "
+          + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
     }
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
     if (holder != null && !pendingFile.getClientName().equals(holder)) {
       throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
           + pendingFile.getClientName() + " but is accessed by " + holder);
     }
+    return pendingFile;
   }
  
   /**
@@ -2395,20 +2386,20 @@ public class FSNamesystem implements Nam
     try {
       pendingFile = checkLease(src, holder);
     } catch (LeaseExpiredException lee) {
-      INodeFile file = dir.getFileINode(src);
-      if (file != null && !file.isUnderConstruction()) {
+      final INode inode = dir.getINode(src);
+      if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) {
         // This could be a retry RPC - i.e the client tried to close
         // the file, but missed the RPC response. Thus, it is trying
         // again to close the file. If the file still exists and
         // the client's view of the last block matches the actual
         // last block, then we'll treat it as a successful close.
         // See HDFS-3031.
-        Block realLastBlock = file.getLastBlock();
+        final Block realLastBlock = ((INodeFile)inode).getLastBlock();
         if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
-          NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " +
-              "received request from " + holder + " to complete file " + src +
+          NameNode.stateChangeLog.info("DIR* completeFile: " +
+              "request from " + holder + " to complete " + src +
               " which is already closed. But, it appears to be an RPC " +
-              "retry. Returning success.");
+              "retry. Returning success");
           return true;
         }
       }
@@ -2423,8 +2414,8 @@ public class FSNamesystem implements Nam
 
     finalizeINodeFileUnderConstruction(src, pendingFile);
 
-    NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
-                                  + " is closed by " + holder);
+    NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
+        + holder);
     return true;
   }
 
@@ -2432,14 +2423,12 @@ public class FSNamesystem implements Nam
    * Allocate a block at the given pending filename
    * 
    * @param src path to the file
-   * @param inodes INode representing each of the components of src. 
-   *        <code>inodes[inodes.length-1]</code> is the INode for the file.
-   *        
+   * @param inodesInPath representing each of the components of src. 
+   *                     The last INode is the INode for the file.
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
-  private Block allocateBlock(String src, INode[] inodes,
-      DatanodeDescriptor targets[]) throws QuotaExceededException,
-      SafeModeException {
+  private Block allocateBlock(String src, INodesInPath inodesInPath,
+      DatanodeDescriptor targets[]) throws IOException {
     assert hasWriteLock();
     Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); 
     while(isValidBlock(b)) {
@@ -2448,9 +2437,9 @@ public class FSNamesystem implements Nam
     // Increment the generation stamp for every new block.
     nextGenerationStamp();
     b.setGenerationStamp(getGenerationStamp());
-    b = dir.addBlock(src, inodes, b, targets);
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
-                                 +src+ ". " + blockPoolId + " "+ b);
+    b = dir.addBlock(src, inodesInPath, b, targets);
+    NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+        + blockPoolId + " " + b);
     return b;
   }
 
@@ -2468,8 +2457,8 @@ public class FSNamesystem implements Nam
         //
         for (BlockInfo block: v.getBlocks()) {
           if (!block.isComplete()) {
-            LOG.info("BLOCK* NameSystem.checkFileProgress: "
-                + "block " + block + " has not reached minimal replication "
+            LOG.info("BLOCK* checkFileProgress: " + block
+                + " has not reached minimal replication "
                 + blockManager.minReplication);
             return false;
           }
@@ -2480,8 +2469,8 @@ public class FSNamesystem implements Nam
         //
         BlockInfo b = v.getPenultimateBlock();
         if (b != null && !b.isComplete()) {
-          LOG.info("BLOCK* NameSystem.checkFileProgress: "
-              + "block " + b + " has not reached minimal replication "
+          LOG.info("BLOCK* checkFileProgress: " + b
+              + " has not reached minimal replication "
               + blockManager.minReplication);
           return false;
         }
@@ -2679,7 +2668,7 @@ public class FSNamesystem implements Nam
       boolean enforcePermission)
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
-    ArrayList<Block> collectedBlocks = new ArrayList<Block>();
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
 
     writeLock();
     try {
@@ -2687,7 +2676,7 @@ public class FSNamesystem implements Nam
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
-      if (!recursive && !dir.isDirEmpty(src)) {
+      if (!recursive && dir.isNonEmptyDirectory(src)) {
         throw new IOException(src + " is non empty");
       }
       if (enforcePermission && isPermissionEnabled) {
@@ -2710,21 +2699,26 @@ public class FSNamesystem implements Nam
     return true;
   }
 
-  /** 
+  /**
    * From the given list, incrementally remove the blocks from blockManager
    * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
    * ensure that other waiters on the lock can get in. See HDFS-2938
+   * 
+   * @param blocks
+   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
+   *          of blocks that need to be removed from blocksMap
    */
-  private void removeBlocks(List<Block> blocks) {
+  private void removeBlocks(BlocksMapUpdateInfo blocks) {
     int start = 0;
     int end = 0;
-    while (start < blocks.size()) {
+    List<Block> toDeleteList = blocks.getToDeleteList();
+    while (start < toDeleteList.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
-      end = end > blocks.size() ? blocks.size() : end;
+      end = end > toDeleteList.size() ? toDeleteList.size() : end;
       writeLock();
       try {
         for (int i = start; i < end; i++) {
-          blockManager.removeBlock(blocks.get(i));
+          blockManager.removeBlock(toDeleteList.get(i));
         }
       } finally {
         writeUnlock();
@@ -2733,7 +2727,12 @@ public class FSNamesystem implements Nam
     }
   }
   
-  void removePathAndBlocks(String src, List<Block> blocks) {
+  /**
+   * Remove leases and blocks related to a given path
+   * @param src The given path
+   * @param blocks Containing the list of blocks to be deleted from blocksMap
+   */
+  void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks) {
     assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     if (blocks == null) {
@@ -2746,7 +2745,7 @@ public class FSNamesystem implements Nam
     boolean trackBlockCounts = isSafeModeTrackingBlocks();
     int numRemovedComplete = 0, numRemovedSafe = 0;
 
-    for (Block b : blocks) {
+    for (Block b : blocks.getToDeleteList()) {
       if (trackBlockCounts) {
         BlockInfo bi = blockManager.getStoredBlock(b);
         if (bi.isComplete()) {
@@ -2954,8 +2953,7 @@ public class FSNamesystem implements Nam
    */
   void fsync(String src, String clientName) 
       throws IOException, UnresolvedLinkException {
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
-                                  + src + " for " + clientName);
+    NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2986,26 +2984,12 @@ public class FSNamesystem implements Nam
   boolean internalReleaseLease(Lease lease, String src, 
       String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
       IOException, UnresolvedLinkException {
-    LOG.info("Recovering lease=" + lease + ", src=" + src);
+    LOG.info("Recovering " + lease + ", src=" + src);
     assert !isInSafeMode();
     assert hasWriteLock();
-    INodeFile iFile = dir.getFileINode(src);
-    if (iFile == null) {
-      final String message = "DIR* NameSystem.internalReleaseLease: "
-        + "attempt to release a create lock on "
-        + src + " file does not exist.";
-      NameNode.stateChangeLog.warn(message);
-      throw new IOException(message);
-    }
-    if (!iFile.isUnderConstruction()) {
-      final String message = "DIR* NameSystem.internalReleaseLease: "
-        + "attempt to release a create lock on "
-        + src + " but file is already closed.";
-      NameNode.stateChangeLog.warn(message);
-      throw new IOException(message);
-    }
 
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+    final INodeFileUnderConstruction pendingFile
+        = INodeFileUnderConstruction.valueOf(dir.getINode(src), src);
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
@@ -3523,7 +3507,7 @@ public class FSNamesystem implements Nam
   @Metric({"TransactionsSinceLastLogRoll",
       "Number of transactions since last edit log roll"})
   public long getTransactionsSinceLastLogRoll() {
-    if (isInStandbyState()) {
+    if (isInStandbyState() || !getEditLog().isSegmentOpen()) {
       return 0;
     } else {
       return getEditLog().getLastWrittenTxId() -
@@ -3641,7 +3625,7 @@ public class FSNamesystem implements Nam
                               "in order to create namespace image.");
       }
       getFSImage().saveNamespace(this);
-      LOG.info("New namespace image has been created.");
+      LOG.info("New namespace image has been created");
     } finally {
       readUnlock();
     }
@@ -3859,11 +3843,11 @@ public class FSNamesystem implements Nam
       }
       long timeInSafemode = now() - startTime;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
-                                    + timeInSafemode/1000 + " secs.");
+                                    + timeInSafemode/1000 + " secs");
       NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
       
       if (reached >= 0) {
-        NameNode.stateChangeLog.info("STATE* Safe mode is OFF."); 
+        NameNode.stateChangeLog.info("STATE* Safe mode is OFF"); 
       }
       reached = -1;
       safeMode = null;
@@ -4183,7 +4167,7 @@ public class FSNamesystem implements Nam
         }
       }
       if (!fsRunning) {
-        LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread. ");
+        LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
       } else {
         // leave safe mode and stop the monitor
         leaveSafeMode();
@@ -4313,17 +4297,14 @@ public class FSNamesystem implements Nam
     try {
       for (Lease lease : leaseManager.getSortedLeases()) {
         for (String path : lease.getPaths()) {
-          INode node;
+          final INodeFileUnderConstruction cons;
           try {
-            node = dir.getFileINode(path);
+            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
           } catch (UnresolvedLinkException e) {
             throw new AssertionError("Lease files should reside on this FS");
+          } catch (IOException e) {
+            throw new RuntimeException(e);
           }
-          assert node != null : "Found a lease for nonexisting file.";
-          assert node.isUnderConstruction() :
-            "Found a lease for file " + path + " that is not under construction." +
-            " lease=" + lease;
-          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
           BlockInfo[] blocks = cons.getBlocks();
           if(blocks == null)
             continue;
@@ -4371,7 +4352,7 @@ public class FSNamesystem implements Nam
       if (isEditlogOpenForWrite) {
         getEditLog().logSyncAll();
       }
-      NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
+      NameNode.stateChangeLog.info("STATE* Safe mode is ON"
           + safeMode.getTurnOffTip());
     } finally {
       writeUnlock();
@@ -4386,7 +4367,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       if (!isInSafeMode()) {
-        NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
+        NameNode.stateChangeLog.info("STATE* Safe mode is already OFF"); 
         return;
       }
       safeMode.leave();
@@ -4740,7 +4721,7 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       
-      NameNode.stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
+      NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
       for (int i = 0; i < blocks.length; i++) {
         ExtendedBlock blk = blocks[i].getBlock();
         DatanodeInfo[] nodes = blocks[i].getLocations();
@@ -4906,21 +4887,12 @@ public class FSNamesystem implements Nam
       for (Lease lease : leaseManager.getSortedLeases()) {
         for(String path : lease.getPaths()) {
           // verify that path exists in namespace
-          INode node;
+          final INodeFileUnderConstruction cons;
           try {
-            node = dir.getFileINode(path);
+            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
           } catch (UnresolvedLinkException e) {
             throw new AssertionError("Lease files should reside on this FS");
           }
-          if (node == null) {
-            throw new IOException("saveLeases found path " + path +
-                                  " but no matching entry in namespace.");
-          }
-          if (!node.isUnderConstruction()) {
-            throw new IOException("saveLeases found path " + path +
-                                  " but is not under construction.");
-          }
-          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
           FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
         }
       }
@@ -5491,7 +5463,11 @@ public class FSNamesystem implements Nam
   public BlockManager getBlockManager() {
     return blockManager;
   }
-  
+  /** @return the FSDirectory. */
+  public FSDirectory getFSDirectory() {
+    return dir;
+  }
+
   /**
    * Verifies that the given identifier and password are valid and match.
    * @param identifier Token identifier.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Thu Nov  8 19:09:46 2012
@@ -121,7 +121,7 @@ class FSPermissionChecker {
     }
     // check if (parentAccess != null) && file exists, then check sb
       // Resolve symlinks, the check is performed on the link target.
-      INode[] inodes = root.getExistingPathINodes(path, true);
+      final INode[] inodes = root.getExistingPathINodes(path, true).getINodes();
       int ancestorIndex = inodes.length - 2;
       for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
           ancestorIndex--);
@@ -173,7 +173,7 @@ class FSPermissionChecker {
       INodeDirectory d = directories.pop();
       check(d, access);
 
-      for(INode child : d.getChildren()) {
+      for(INode child : d.getChildrenList()) {
         if (child.isDirectory()) {
           directories.push((INodeDirectory)child);
         }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Thu Nov  8 19:09:46 2012
@@ -77,7 +77,7 @@ public class GetDelegationTokenServlet e
       });
 
     } catch(Exception e) {
-      LOG.info("Exception while sending token. Re-throwing. ", e);
+      LOG.info("Exception while sending token. Re-throwing ", e);
       resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
     } finally {
       if(dos != null) dos.close();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu Nov  8 19:09:46 2012
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -30,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.SignedBytes;
 
 /**
@@ -39,7 +44,8 @@ import com.google.common.primitives.Sign
  */
 @InterfaceAudience.Private
 abstract class INode implements Comparable<byte[]> {
-  /*
+  static final List<INode> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<INode>());
+  /**
    *  The inode name is in java UTF8 encoding; 
    *  The name in HdfsFileStatus should keep the same encoding as this.
    *  if this encoding is changed, implicitly getFileInfo and listStatus in
@@ -177,14 +183,20 @@ abstract class INode implements Comparab
   /**
    * Check whether it's a directory
    */
-  abstract boolean isDirectory();
+  public boolean isDirectory() {
+    return false;
+  }
 
   /**
-   * Collect all the blocks in all children of this INode.
-   * Count and return the number of files in the sub tree.
-   * Also clears references since this INode is deleted.
+   * Collect all the blocks in all children of this INode. Count and return the
+   * number of files in the sub tree. Also clears references since this INode is
+   * deleted.
+   * 
+   * @param info
+   *          Containing all the blocks collected from the children of this
+   *          INode. These blocks later should be removed from the blocksMap.
    */
-  abstract int collectSubtreeBlocksAndClear(List<Block> v);
+  abstract int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info);
 
   /** Compute {@link ContentSummary}. */
   public final ContentSummary computeContentSummary() {
@@ -222,11 +234,10 @@ abstract class INode implements Comparab
   abstract DirCounts spaceConsumedInTree(DirCounts counts);
   
   /**
-   * Get local file name
-   * @return local file name
+   * @return null if the local name is null; otherwise, return the local name.
    */
   String getLocalName() {
-    return DFSUtil.bytes2String(name);
+    return name == null? null: DFSUtil.bytes2String(name);
   }
 
 
@@ -236,8 +247,8 @@ abstract class INode implements Comparab
   }
 
   /**
-   * Get local file name
-   * @return local file name
+   * @return null if the local name is null;
+   *         otherwise, return the local name byte array.
    */
   byte[] getLocalNameBytes() {
     return name;
@@ -327,7 +338,7 @@ abstract class INode implements Comparab
   /**
    * Check whether it's a symlink
    */
-  public boolean isLink() {
+  public boolean isSymlink() {
     return false;
   }
 
@@ -455,4 +466,74 @@ abstract class INode implements Comparab
     return new INodeFile(permissions, blocks, replication,
         modificationTime, atime, preferredBlockSize);
   }
+
+  /**
+   * Dump the subtree starting from this inode.
+   * @return a text representation of the tree.
+   */
+  @VisibleForTesting
+  public StringBuffer dumpTreeRecursively() {
+    final StringWriter out = new StringWriter(); 
+    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder());
+    return out.getBuffer();
+  }
+
+  /**
+   * Dump tree recursively.
+   * @param prefix The prefix string that each line should print.
+   */
+  @VisibleForTesting
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
+    out.print(prefix);
+    out.print(" ");
+    out.print(getLocalName());
+    out.print("   (");
+    final String s = super.toString();
+    out.print(s.substring(s.lastIndexOf(getClass().getSimpleName())));
+    out.println(")");
+  }
+  
+  /**
+   * Information used for updating the blocksMap when deleting files.
+   */
+  public static class BlocksMapUpdateInfo {
+    /**
+     * The list of blocks that need to be removed from blocksMap
+     */
+    private List<Block> toDeleteList;
+    
+    public BlocksMapUpdateInfo(List<Block> toDeleteList) {
+      this.toDeleteList = toDeleteList == null ? new ArrayList<Block>()
+          : toDeleteList;
+    }
+    
+    public BlocksMapUpdateInfo() {
+      toDeleteList = new ArrayList<Block>();
+    }
+    
+    /**
+     * @return The list of blocks that need to be removed from blocksMap
+     */
+    public List<Block> getToDeleteList() {
+      return toDeleteList;
+    }
+    
+    /**
+     * Add a to-be-deleted block into the
+     * {@link BlocksMapUpdateInfo#toDeleteList}
+     * @param toDelete the to-be-deleted block
+     */
+    public void addDeleteBlock(Block toDelete) {
+      if (toDelete != null) {
+        toDeleteList.add(toDelete);
+      }
+    }
+    
+    /**
+     * Clear {@link BlocksMapUpdateInfo#toDeleteList}
+     */
+    public void clear() {
+      toDeleteList.clear();
+    }
+  }
 }



Mime
View raw message