hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1480838 [3/7] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/...
Date Thu, 09 May 2013 23:55:52 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu May  9 23:55:49 2013
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.FSLimitException;
+import org.apache.hadoop.hdfs.protocol.FSLimitException.IllegalNameException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -62,10 +62,15 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotException;
 import org.apache.hadoop.hdfs.util.ByteArray;
-import org.apache.hadoop.hdfs.util.GSet;
-import org.apache.hadoop.hdfs.util.LightWeightGSet;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -81,9 +86,13 @@ import com.google.common.base.Preconditi
  *************************************************/
 public class FSDirectory implements Closeable {
   private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
-    return new INodeDirectoryWithQuota(INodeId.ROOT_INODE_ID,
+    final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
+        INodeId.ROOT_INODE_ID,
         INodeDirectory.ROOT_NAME,
         namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)));
+    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
+    s.setSnapshotQuota(0);
+    return s;
   }
 
   @VisibleForTesting
@@ -103,7 +112,7 @@ public class FSDirectory implements Clos
   private final int maxComponentLength;
   private final int maxDirItems;
   private final int lsLimit;  // max list limit
-  private GSet<INode, INode> inodeMap; // Synchronized by dirLock
+  private final INodeMap inodeMap; // Synchronized by dirLock
 
   // lock to protect the directory and BlockMap
   private ReentrantReadWriteLock dirLock;
@@ -144,7 +153,7 @@ public class FSDirectory implements Clos
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     this.cond = dirLock.writeLock().newCondition();
     rootDir = createRoot(ns);
-    inodeMap = initInodeMap(rootDir);
+    inodeMap = INodeMap.newInstance(rootDir);
     this.fsImage = fsImage;
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
@@ -167,16 +176,6 @@ public class FSDirectory implements Clos
     nameCache = new NameCache<ByteArray>(threshold);
     namesystem = ns;
   }
-  
-  @VisibleForTesting
-  static LightWeightGSet<INode, INode> initInodeMap(INodeDirectory rootDir) {
-    // Compute the map capacity by allocating 1% of total memory
-    int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
-    LightWeightGSet<INode, INode> map = new LightWeightGSet<INode, INode>(
-        capacity);
-    map.put(rootDir);
-    return map;
-  }
     
   private FSNamesystem getFSNamesystem() {
     return namesystem;
@@ -186,6 +185,11 @@ public class FSDirectory implements Clos
     return getFSNamesystem().getBlockManager();
   }
 
+  /** @return the root directory inode. */
+  public INodeDirectoryWithQuota getRoot() {
+    return rootDir;
+  }
+
   /**
    * Notify that loading of this FSDirectory is complete, and
    * it is ready for use 
@@ -218,7 +222,7 @@ public class FSDirectory implements Clos
     ready = flag;
   }
 
-  private void incrDeletedFileCount(int count) {
+  private void incrDeletedFileCount(long count) {
     if (getFSNamesystem() != null)
       NameNode.getNameNodeMetrics().incrFilesDeleted(count);
   }
@@ -255,6 +259,7 @@ public class FSDirectory implements Clos
    * @throws FileAlreadyExistsException
    * @throws QuotaExceededException
    * @throws UnresolvedLinkException
+   * @throws SnapshotAccessControlException 
    */
   INodeFileUnderConstruction addFile(String path, 
                 PermissionStatus permissions,
@@ -264,7 +269,7 @@ public class FSDirectory implements Clos
                 String clientMachine,
                 DatanodeDescriptor clientNode)
     throws FileAlreadyExistsException, QuotaExceededException,
-      UnresolvedLinkException {
+      UnresolvedLinkException, SnapshotAccessControlException {
     waitForReady();
 
     // Always do an implicit mkdirs for parent directory tree.
@@ -303,7 +308,7 @@ public class FSDirectory implements Clos
     return newNode;
   }
 
-  INode unprotectedAddFile( long id,
+  INodeFile unprotectedAddFile( long id,
                             String path, 
                             PermissionStatus permissions,
                             short replication,
@@ -313,14 +318,14 @@ public class FSDirectory implements Clos
                             boolean underConstruction,
                             String clientName,
                             String clientMachine) {
-    final INode newNode;
+    final INodeFile newNode;
     assert hasWriteLock();
     if (underConstruction) {
       newNode = new INodeFileUnderConstruction(id, permissions, replication,
           preferredBlockSize, modificationTime, clientName, clientMachine, null);
     } else {
-      newNode = new INodeFile(id, permissions, BlockInfo.EMPTY_ARRAY,
-          replication, modificationTime, atime, preferredBlockSize);
+      newNode = new INodeFile(id, null, permissions, modificationTime, atime,
+          BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
     }
 
     try {
@@ -346,19 +351,17 @@ public class FSDirectory implements Clos
 
     writeLock();
     try {
-      final INode[] inodes = inodesInPath.getINodes();
       final INodeFileUnderConstruction fileINode = 
-          INodeFileUnderConstruction.valueOf(inodes[inodes.length-1], path);
+          INodeFileUnderConstruction.valueOf(inodesInPath.getLastINode(), path);
 
       // check quota limits and updated space consumed
-      updateCount(inodesInPath, inodes.length-1, 0,
-          fileINode.getPreferredBlockSize()*fileINode.getBlockReplication(), true);
+      updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
 
       // associate new last block for the file
       BlockInfoUnderConstruction blockInfo =
         new BlockInfoUnderConstruction(
             block,
-            fileINode.getBlockReplication(),
+            fileINode.getFileReplication(),
             BlockUCState.UNDER_CONSTRUCTION,
             targets);
       getBlockManager().addBlockCollection(blockInfo, fileINode);
@@ -400,11 +403,9 @@ public class FSDirectory implements Clos
    */
   void closeFile(String path, INodeFile file) {
     waitForReady();
-    long now = now();
     writeLock();
     try {
       // file is closed
-      file.setModificationTimeForce(now);
       fsImage.getEditLog().logCloseFile(path, file);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
@@ -444,20 +445,19 @@ public class FSDirectory implements Clos
     }
 
     // update space consumed
-    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, true);
-    final INode[] inodes = inodesInPath.getINodes();
-    updateCount(inodesInPath, inodes.length-1, 0,
-        -fileNode.getPreferredBlockSize()*fileNode.getBlockReplication(), true);
+    final INodesInPath iip = rootDir.getINodesInPath4Write(path, true);
+    updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
   }
 
   /**
+   * @throws SnapshotAccessControlException 
    * @see #unprotectedRenameTo(String, String, long)
    * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    */
   @Deprecated
   boolean renameTo(String src, String dst) 
       throws QuotaExceededException, UnresolvedLinkException, 
-      FileAlreadyExistsException {
+      FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
           +src+" to "+dst);
@@ -507,16 +507,16 @@ public class FSDirectory implements Clos
    * @return true if rename succeeds; false otherwise
    * @throws QuotaExceededException if the operation violates any quota limit
    * @throws FileAlreadyExistsException if the src is a symlink that points to dst
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    * @deprecated See {@link #renameTo(String, String)}
    */
   @Deprecated
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
-    FileAlreadyExistsException {
+    FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     assert hasWriteLock();
-    INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
-    INode[] srcInodes = srcInodesInPath.getINodes();
-    INode srcInode = srcInodes[srcInodes.length-1];
+    INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
+    final INode srcInode = srcIIP.getLastINode();
     
     // check the validation of the source
     if (srcInode == null) {
@@ -525,11 +525,18 @@ public class FSDirectory implements Clos
           + " because source does not exist");
       return false;
     } 
-    if (srcInodes.length == 1) {
+    if (srcIIP.getINodes().length == 1) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst+ " because source is the root");
       return false;
     }
+    
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    List<INodeDirectorySnapshottable> snapshottableDirs = 
+        new ArrayList<INodeDirectorySnapshottable>();
+    checkSnapshot(srcInode, snapshottableDirs);
+    
     if (isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
     }
@@ -539,7 +546,7 @@ public class FSDirectory implements Clos
       return true;
     }
     if (srcInode.isSymlink() && 
-        dst.equals(((INodeSymlink)srcInode).getSymlinkString())) {
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
       throw new FileAlreadyExistsException(
           "Cannot rename symlink "+src+" to its target "+dst);
     }
@@ -554,16 +561,19 @@ public class FSDirectory implements Clos
     }
     
     byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
-        dstComponents.length, false);
-    INode[] dstInodes = dstInodesInPath.getINodes();
-    if (dstInodes[dstInodes.length-1] != null) {
+    INodesInPath dstIIP = getExistingPathINodes(dstComponents);
+    if (dstIIP.isSnapshot()) {
+      throw new SnapshotAccessControlException(
+          "Modification on RO snapshot is disallowed");
+    }
+    if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                    +"failed to rename "+src+" to "+dst+ 
                                    " because destination exists");
       return false;
     }
-    if (dstInodes[dstInodes.length-2] == null) {
+    INode dstParent = dstIIP.getINode(-2);
+    if (dstParent == null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           +"failed to rename "+src+" to "+dst+ 
           " because destination's parent does not exist");
@@ -571,43 +581,139 @@ public class FSDirectory implements Clos
     }
     
     // Ensure dst has quota to accommodate rename
-    verifyQuotaForRename(srcInodes, dstInodes);
+    verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
     
     boolean added = false;
-    INode srcChild = null;
-    String srcChildName = null;
+    INode srcChild = srcIIP.getLastINode();
+    final byte[] srcChildName = srcChild.getLocalNameBytes();
+    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
+        srcIIP.getLatestSnapshot());
+    final boolean srcChildIsReference = srcChild.isReference();
+    
+    // Record the snapshot on srcChild. After the rename, before any new 
+    // snapshot is taken on the dst tree, changes will be recorded in the latest
+    // snapshot of the src tree.
+    if (isSrcInSnapshot) {
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot(),
+          inodeMap);
+      srcIIP.setLastINode(srcChild);
+    }
+    
+    // check srcChild for reference
+    final INodeReference.WithCount withCount;
+    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();
+    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
+        .getDstSnapshotId() : Snapshot.INVALID_ID;
+    if (isSrcInSnapshot) {
+      final INodeReference.WithName withName = 
+          srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
+              srcChild, srcIIP.getLatestSnapshot()); 
+      withCount = (INodeReference.WithCount) withName.getReferredINode();
+      srcChild = withName;
+      srcIIP.setLastINode(srcChild);
+      // get the counts before rename
+      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true,
+          Snapshot.INVALID_ID);
+    } else if (srcChildIsReference) {
+      // srcChild is reference but srcChild is not in latest snapshot
+      withCount = (WithCount) srcChild.asReference().getReferredINode();
+    } else {
+      withCount = null;
+    }
+
     try {
       // remove src
-      srcChild = removeLastINode(srcInodesInPath);
-      if (srcChild == null) {
+      final long removedSrc = removeLastINode(srcIIP);
+      if (removedSrc == -1) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
             + " because the source can not be removed");
         return false;
       }
-      srcChildName = srcChild.getLocalName();
-      srcChild.setLocalName(dstComponents[dstInodes.length-1]);
+      
+      if (dstParent.getParent() == null) {
+        // src and dst file/dir are in the same directory, and the dstParent has
+        // been replaced when we removed the src. Refresh the dstIIP and
+        // dstParent.
+        dstIIP = getExistingPathINodes(dstComponents);
+        dstParent = dstIIP.getINode(-2);
+      }
       
       // add src to the destination
-      added = addLastINodeNoQuotaCheck(dstInodesInPath, srcChild);
+      
+      srcChild = srcIIP.getLastINode();
+      final byte[] dstChildName = dstIIP.getLastLocalName();
+      final INode toDst;
+      if (withCount == null) {
+        srcChild.setLocalName(dstChildName);
+        toDst = srcChild;
+      } else {
+        withCount.getReferredINode().setLocalName(dstChildName);
+        Snapshot dstSnapshot = dstIIP.getLatestSnapshot();
+        final INodeReference.DstReference ref = new INodeReference.DstReference(
+            dstParent.asDirectory(), withCount,
+            dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
+        toDst = ref;
+      }
+      
+      added = addLastINodeNoQuotaCheck(dstIIP, toDst);
       if (added) {
-        srcChild = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
               + src + " is renamed to " + dst);
         }
         // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
-        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
+        final INode srcParent = srcIIP.getINode(-2);
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot(),
+            inodeMap);
+        dstParent = dstIIP.getINode(-2); // refresh dstParent
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot(),
+            inodeMap);
         // update moved leases with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);        
+        getFSNamesystem().unprotectedChangeLease(src, dst);     
+
+        // update the quota usage in src tree
+        if (isSrcInSnapshot) {
+          // get the counts after rename
+          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+              Quota.Counts.newInstance(), false, Snapshot.INVALID_ID);
+          newSrcCounts.subtract(oldSrcCounts);
+          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+              newSrcCounts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID);
+        }
+        
         return true;
       }
     } finally {
-      if (!added && srcChild != null) {
+      if (!added) {
+        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
+        final INode oldSrcChild = srcChild;
         // put it back
-        srcChild.setLocalName(srcChildName);
-        addLastINodeNoQuotaCheck(srcInodesInPath, srcChild);
+        if (withCount == null) {
+          srcChild.setLocalName(srcChildName);
+        } else if (!srcChildIsReference) { // src must be in snapshot
+          // the withCount node will no longer be used thus no need to update
+          // its reference number here
+          final INode originalChild = withCount.getReferredINode();
+          srcChild = originalChild;
+        } else {
+          withCount.removeReference(oldSrcChild.asReference());
+          final INodeReference originalRef = new INodeReference.DstReference(
+              srcParent, withCount, srcRefDstSnapshot);
+          srcChild = originalRef;
+        }
+        
+        if (isSrcInSnapshot) {
+          // srcParent must be an INodeDirectoryWithSnapshot instance since
+          // isSrcInSnapshot is true and src node has been removed from 
+          // srcParent 
+          ((INodeDirectoryWithSnapshot) srcParent).undoRename4ScrParent(
+              oldSrcChild.asReference(), srcChild, srcIIP.getLatestSnapshot());
+        } else {
+          // original srcChild is not in latest snapshot, we only need to add
+          // the srcChild back
+          addLastINodeNoQuotaCheck(srcIIP, srcChild);
+        }
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -639,9 +745,8 @@ public class FSDirectory implements Clos
       }
     }
     String error = null;
-    final INodesInPath srcInodesInPath = rootDir.getExistingPathINodes(src, false);
-    final INode[] srcInodes = srcInodesInPath.getINodes();
-    final INode srcInode = srcInodes[srcInodes.length - 1];
+    final INodesInPath srcIIP = rootDir.getINodesInPath4Write(src, false);
+    final INode srcInode = srcIIP.getLastINode();
     // validate source
     if (srcInode == null) {
       error = "rename source " + src + " is not found.";
@@ -649,20 +754,23 @@ public class FSDirectory implements Clos
           + error);
       throw new FileNotFoundException(error);
     }
-    if (srcInodes.length == 1) {
+    if (srcIIP.getINodes().length == 1) {
       error = "rename source cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new IOException(error);
     }
-
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    checkSnapshot(srcInode, null);
+    
     // validate the destination
     if (dst.equals(src)) {
       throw new FileAlreadyExistsException(
           "The source "+src+" and destination "+dst+" are the same");
     }
     if (srcInode.isSymlink() && 
-        dst.equals(((INodeSymlink)srcInode).getSymlinkString())) {
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
       throw new FileAlreadyExistsException(
           "Cannot rename symlink "+src+" to its target "+dst);
     }
@@ -675,17 +783,17 @@ public class FSDirectory implements Clos
           + error);
       throw new IOException(error);
     }
-    final byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstInodesInPath = rootDir.getExistingPathINodes(dstComponents,
-        dstComponents.length, false);
-    final INode[] dstInodes = dstInodesInPath.getINodes();
-    INode dstInode = dstInodes[dstInodes.length - 1];
-    if (dstInodes.length == 1) {
+    INodesInPath dstIIP = rootDir.getINodesInPath4Write(dst, false);
+    if (dstIIP.getINodes().length == 1) {
       error = "rename destination cannot be the root";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new IOException(error);
     }
+
+    final INode dstInode = dstIIP.getLastINode();
+    List<INodeDirectorySnapshottable> snapshottableDirs = 
+        new ArrayList<INodeDirectorySnapshottable>();
     if (dstInode != null) { // Destination exists
       // It's OK to rename a file to a symlink and vice versa
       if (dstInode.isDirectory() != srcInode.isDirectory()) {
@@ -702,8 +810,8 @@ public class FSDirectory implements Clos
         throw new FileAlreadyExistsException(error);
       }
       if (dstInode.isDirectory()) {
-        final List<INode> children = ((INodeDirectory) dstInode
-            ).getChildrenList();
+        final ReadOnlyList<INode> children = dstInode.asDirectory()
+            .getChildrenList(null);
         if (!children.isEmpty()) {
           error = "rename destination directory is not empty: " + dst;
           NameNode.stateChangeLog.warn(
@@ -711,14 +819,17 @@ public class FSDirectory implements Clos
           throw new IOException(error);
         }
       }
+      checkSnapshot(dstInode, snapshottableDirs);
     }
-    if (dstInodes[dstInodes.length - 2] == null) {
+
+    INode dstParent = dstIIP.getINode(-2);
+    if (dstParent == null) {
       error = "rename destination parent " + dst + " not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new FileNotFoundException(error);
     }
-    if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+    if (!dstParent.isDirectory()) {
       error = "rename destination parent " + dst + " is a file.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
@@ -726,82 +837,204 @@ public class FSDirectory implements Clos
     }
 
     // Ensure dst has quota to accommodate rename
-    verifyQuotaForRename(srcInodes, dstInodes);
-    INode removedSrc = removeLastINode(srcInodesInPath);
-    if (removedSrc == null) {
+    verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
+
+    INode srcChild = srcIIP.getLastINode();
+    final byte[] srcChildName = srcChild.getLocalNameBytes();
+    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
+        srcIIP.getLatestSnapshot());
+    final boolean srcChildIsReference = srcChild.isReference();
+    
+    // Record the snapshot on srcChild. After the rename, before any new 
+    // snapshot is taken on the dst tree, changes will be recorded in the latest
+    // snapshot of the src tree.
+    if (isSrcInSnapshot) {
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot(),
+          inodeMap);
+      srcIIP.setLastINode(srcChild);
+    }
+    
+    // check srcChild for reference
+    final INodeReference.WithCount withCount;
+    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
+        .getDstSnapshotId() : Snapshot.INVALID_ID;
+    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();    
+    if (isSrcInSnapshot) {
+      final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
+          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshot()); 
+      withCount = (INodeReference.WithCount) withName.getReferredINode();
+      srcChild = withName;
+      srcIIP.setLastINode(srcChild);
+      // get the counts before rename
+      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true,
+          Snapshot.INVALID_ID);
+    } else if (srcChildIsReference) {
+      // srcChild is reference but srcChild is not in latest snapshot
+      withCount = (WithCount) srcChild.asReference().getReferredINode();
+    } else {
+      withCount = null;
+    }
+    
+    boolean undoRemoveSrc = true;
+    final long removedSrc = removeLastINode(srcIIP);
+    if (removedSrc == -1) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new IOException(error);
     }
-    final String srcChildName = removedSrc.getLocalName();
-    String dstChildName = null;
+    
+    if (dstParent.getParent() == null) {
+      // src and dst file/dir are in the same directory, and the dstParent has
+      // been replaced when we removed the src. Refresh the dstIIP and
+      // dstParent.
+      dstIIP = rootDir.getINodesInPath4Write(dst, false);
+    }
+    
+    boolean undoRemoveDst = false;
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeLastINode(dstInodesInPath);
-        dstChildName = removedDst.getLocalName();
+        if (removeLastINode(dstIIP) != -1) {
+          removedDst = dstIIP.getLastINode();
+          undoRemoveDst = true;
+        }
+      }
+      
+      srcChild = srcIIP.getLastINode();
+
+      final byte[] dstChildName = dstIIP.getLastLocalName();
+      final INode toDst;
+      if (withCount == null) {
+        srcChild.setLocalName(dstChildName);
+        toDst = srcChild;
+      } else {
+        withCount.getReferredINode().setLocalName(dstChildName);
+        Snapshot dstSnapshot = dstIIP.getLatestSnapshot();
+        final INodeReference.DstReference ref = new INodeReference.DstReference(
+            dstIIP.getINode(-2).asDirectory(), withCount,
+            dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
+        toDst = ref;
       }
 
-      removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
       // add src as dst to complete rename
-      if (addLastINodeNoQuotaCheck(dstInodesInPath, removedSrc)) {
-        removedSrc = null;
+      if (addLastINodeNoQuotaCheck(dstIIP, toDst)) {
+        undoRemoveSrc = false;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
               "DIR* FSDirectory.unprotectedRenameTo: " + src
               + " is renamed to " + dst);
         }
-        srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
-        dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+        final INode srcParent = srcIIP.getINode(-2);
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot(),
+            inodeMap);
+        dstParent = dstIIP.getINode(-2);
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot(),
+            inodeMap);
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
 
         // Collect the blocks and remove the lease for previous dst
-        int filesDeleted = 0;
+        long filesDeleted = -1;
         if (removedDst != null) {
-          INode rmdst = removedDst;
-          removedDst = null;
+          undoRemoveDst = false;
           BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-          filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
-          getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+          List<INode> removedINodes = new ArrayList<INode>();
+          filesDeleted = removedDst.cleanSubtree(null,
+              dstIIP.getLatestSnapshot(), collectedBlocks, removedINodes).get(
+              Quota.NAMESPACE);
+          getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
+              removedINodes);
+        }
+
+        if (snapshottableDirs.size() > 0) {
+          // There are snapshottable directories (without snapshots) to be
+          // deleted. Need to update the SnapshotManager.
+          namesystem.removeSnapshottableDirs(snapshottableDirs);
+        }
+        
+        // update the quota usage in src tree
+        if (isSrcInSnapshot) {
+          // get the counts after rename
+          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+              Quota.Counts.newInstance(), false, Snapshot.INVALID_ID);
+          newSrcCounts.subtract(oldSrcCounts);
+          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+              newSrcCounts.get(Quota.DISKSPACE), false, Snapshot.INVALID_ID);
         }
-        return filesDeleted >0;
+        
+        return filesDeleted >= 0;
       }
     } finally {
-      if (removedSrc != null) {
+      if (undoRemoveSrc) {
         // Rename failed - restore src
-        removedSrc.setLocalName(srcChildName);
-        addLastINodeNoQuotaCheck(srcInodesInPath, removedSrc);
+        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
+        final INode oldSrcChild = srcChild;
+        // put it back
+        if (withCount == null) {
+          srcChild.setLocalName(srcChildName);
+        } else if (!srcChildIsReference) { // src must be in snapshot
+          // the withCount node will no longer be used thus no need to update
+          // its reference number here
+          final INode originalChild = withCount.getReferredINode();
+          srcChild = originalChild;
+        } else {
+          withCount.removeReference(oldSrcChild.asReference());
+          final INodeReference originalRef = new INodeReference.DstReference(
+              srcParent, withCount, srcRefDstSnapshot);
+          srcChild = originalRef;
+        }
+        
+        if (srcParent instanceof INodeDirectoryWithSnapshot) {
+          ((INodeDirectoryWithSnapshot) srcParent).undoRename4ScrParent(
+              oldSrcChild.asReference(), srcChild, srcIIP.getLatestSnapshot());
+        } else {
+          // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
+          // the srcChild back
+          addLastINodeNoQuotaCheck(srcIIP, srcChild);
+        }
       }
-      if (removedDst != null) {
+      if (undoRemoveDst) {
         // Rename failed - restore dst
-        removedDst.setLocalName(dstChildName);
-        addLastINodeNoQuotaCheck(dstInodesInPath, removedDst);
+        if (dstParent instanceof INodeDirectoryWithSnapshot) {
+          ((INodeDirectoryWithSnapshot) dstParent).undoRename4DstParent(
+              removedDst, dstIIP.getLatestSnapshot());
+        } else {
+          addLastINodeNoQuotaCheck(dstIIP, removedDst);
+        }
+        if (removedDst.isReference()) {
+          final INodeReference removedDstRef = removedDst.asReference();
+          final INodeReference.WithCount wc = 
+              (WithCount) removedDstRef.getReferredINode().asReference();
+          wc.addReference(removedDstRef);
+        }
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
         + "failed to rename " + src + " to " + dst);
     throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
-
+  
   /**
    * Set file replication
    * 
    * @param src file name
    * @param replication new replication
-   * @param oldReplication old replication - output parameter
+   * @param blockRepls block replications - output parameter
    * @return array of file blocks
    * @throws QuotaExceededException
+   * @throws SnapshotAccessControlException 
    */
-  Block[] setReplication(String src, short replication, short[] oldReplication)
-      throws QuotaExceededException, UnresolvedLinkException {
+  Block[] setReplication(String src, short replication, short[] blockRepls)
+      throws QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     waitForReady();
-    Block[] fileBlocks = null;
     writeLock();
     try {
-      fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
+      final Block[] fileBlocks = unprotectedSetReplication(
+          src, replication, blockRepls);
       if (fileBlocks != null)  // log replication change
         fsImage.getEditLog().logSetReplication(src, replication);
       return fileBlocks;
@@ -810,32 +1043,42 @@ public class FSDirectory implements Clos
     }
   }
 
-  Block[] unprotectedSetReplication(String src, 
-                                    short replication,
-                                    short[] oldReplication
-                                    ) throws QuotaExceededException, 
-                                    UnresolvedLinkException {
+  Block[] unprotectedSetReplication(String src, short replication,
+      short[] blockRepls) throws QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException {
     assert hasWriteLock();
 
-    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
-    final INode[] inodes = inodesInPath.getINodes();
-    INode inode = inodes[inodes.length - 1];
+    final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
+    final INode inode = iip.getLastINode();
     if (inode == null || !inode.isFile()) {
       return null;
     }
-    INodeFile fileNode = (INodeFile)inode;
-    final short oldRepl = fileNode.getBlockReplication();
+    INodeFile file = inode.asFile();
+    final short oldBR = file.getBlockReplication();
 
-    // check disk quota
-    long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
-    updateCount(inodesInPath, inodes.length-1, 0, dsDelta, true);
+    // before setFileReplication, check for increasing block replication.
+    // if replication > oldBR, then newBR == replication.
+    // if replication < oldBR, we don't know newBR yet. 
+    if (replication > oldBR) {
+      long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
+      updateCount(iip, 0, dsDelta, true);
+    }
 
-    fileNode.setReplication(replication);
+    file = file.setFileReplication(replication, iip.getLatestSnapshot(),
+        inodeMap);
+    
+    final short newBR = file.getBlockReplication(); 
+    // check newBR < oldBR case. 
+    if (newBR < oldBR) {
+      long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
+      updateCount(iip, 0, dsDelta, true);
+    }
 
-    if (oldReplication != null) {
-      oldReplication[0] = oldRepl;
+    if (blockRepls != null) {
+      blockRepls[0] = oldBR;
+      blockRepls[1] = newBR;
     }
-    return fileNode.getBlocks();
+    return file.getBlocks();
   }
 
   /**
@@ -861,14 +1104,15 @@ public class FSDirectory implements Clos
       if (inode == null) {
          return false;
       }
-      return !inode.isFile() || ((INodeFile)inode).getBlocks() != null;
+      return !inode.isFile() || inode.asFile().getBlocks() != null;
     } finally {
       readUnlock();
     }
   }
-
+  
   void setPermission(String src, FsPermission permission)
-      throws FileNotFoundException, UnresolvedLinkException {
+      throws FileNotFoundException, UnresolvedLinkException,
+      QuotaExceededException, SnapshotAccessControlException {
     writeLock();
     try {
       unprotectedSetPermission(src, permission);
@@ -877,19 +1121,23 @@ public class FSDirectory implements Clos
     }
     fsImage.getEditLog().logSetPermissions(src, permission);
   }
-
-  void unprotectedSetPermission(String src, FsPermission permissions) 
-      throws FileNotFoundException, UnresolvedLinkException {
+  
+  void unprotectedSetPermission(String src, FsPermission permissions)
+      throws FileNotFoundException, UnresolvedLinkException,
+      QuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
-    INode inode = rootDir.getNode(src, true);
+    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
+    final INode inode = inodesInPath.getLastINode();
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
-    inode.setPermission(permissions);
+    inode.setPermission(permissions, inodesInPath.getLatestSnapshot(), 
+        inodeMap);
   }
 
   void setOwner(String src, String username, String groupname)
-      throws FileNotFoundException, UnresolvedLinkException {
+      throws FileNotFoundException, UnresolvedLinkException,
+      QuotaExceededException, SnapshotAccessControlException {
     writeLock();
     try {
       unprotectedSetOwner(src, username, groupname);
@@ -899,26 +1147,30 @@ public class FSDirectory implements Clos
     fsImage.getEditLog().logSetOwner(src, username, groupname);
   }
 
-  void unprotectedSetOwner(String src, String username, String groupname) 
-      throws FileNotFoundException, UnresolvedLinkException {
+  void unprotectedSetOwner(String src, String username, String groupname)
+      throws FileNotFoundException, UnresolvedLinkException,
+      QuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
-    INode inode = rootDir.getNode(src, true);
+    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
+    INode inode = inodesInPath.getLastINode();
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
     if (username != null) {
-      inode.setUser(username);
+      inode = inode.setUser(username, inodesInPath.getLatestSnapshot(),
+          inodeMap);
     }
     if (groupname != null) {
-      inode.setGroup(groupname);
+      inode.setGroup(groupname, inodesInPath.getLatestSnapshot(), inodeMap);
     }
   }
 
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  public void concat(String target, String [] srcs) 
-      throws UnresolvedLinkException {
+  void concat(String target, String [] srcs) 
+      throws UnresolvedLinkException, QuotaExceededException,
+      SnapshotAccessControlException, SnapshotException {
     writeLock();
     try {
       // actual move
@@ -931,38 +1183,49 @@ public class FSDirectory implements Clos
       writeUnlock();
     }
   }
-  
 
-  
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    * @param target target file to move the blocks to
    * @param srcs list of file to move the blocks from
-   * Must be public because also called from EditLogs
-   * NOTE: - it does not update quota (not needed for concat)
    */
-  public void unprotectedConcat(String target, String [] srcs, long timestamp) 
-      throws UnresolvedLinkException {
+  void unprotectedConcat(String target, String [] srcs, long timestamp) 
+      throws UnresolvedLinkException, QuotaExceededException,
+      SnapshotAccessControlException, SnapshotException {
     assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
     }
     // do the move
     
-    final INodesInPath trgINodesInPath = rootDir.getExistingPathINodes(target, true);
-    final INode[] trgINodes = trgINodesInPath.getINodes();
-    INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
-    INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
-    
-    INodeFile [] allSrcInodes = new INodeFile[srcs.length];
-    int i = 0;
-    int totalBlocks = 0;
-    for(String src : srcs) {
-      INodeFile srcInode = (INodeFile)getINode(src);
-      allSrcInodes[i++] = srcInode;
-      totalBlocks += srcInode.numBlocks();  
+    final INodesInPath trgIIP = rootDir.getINodesInPath4Write(target, true);
+    final INode[] trgINodes = trgIIP.getINodes();
+    final INodeFile trgInode = trgIIP.getLastINode().asFile();
+    INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
+    final Snapshot trgLatestSnapshot = trgIIP.getLatestSnapshot();
+    
+    final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+    for(int i = 0; i < srcs.length; i++) {
+      final INodesInPath iip = getINodesInPath4Write(srcs[i]);
+      final Snapshot latest = iip.getLatestSnapshot();
+      final INode inode = iip.getLastINode();
+
+      // check if the file in the latest snapshot
+      if (inode.isInLatestSnapshot(latest)) {
+        throw new SnapshotException("Concat: the source file " + srcs[i]
+            + " is in snapshot " + latest);
+      }
+
+      // check if the file has other references.
+      if (inode.isReference() && ((INodeReference.WithCount)
+          inode.asReference().getReferredINode()).getReferenceCount() > 1) {
+        throw new SnapshotException("Concat: the source file " + srcs[i]
+            + " is referred by some other reference in some snapshot.");
+      }
+
+      allSrcInodes[i] = inode.asFile();
     }
-    trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+    trgInode.concatBlocks(allSrcInodes);
     
     // since we are in the same dir - we can use same parent to remove files
     int count = 0;
@@ -970,15 +1233,18 @@ public class FSDirectory implements Clos
       if(nodeToRemove == null) continue;
       
       nodeToRemove.setBlocks(null);
-      trgParent.removeChild(nodeToRemove);
+      trgParent.removeChild(nodeToRemove, trgLatestSnapshot, null);
       inodeMap.remove(nodeToRemove);
       count++;
     }
     
-    trgInode.setModificationTimeForce(timestamp);
-    trgParent.setModificationTime(timestamp);
+    // update inodeMap
+    removeFromInodeMap(Arrays.asList(allSrcInodes));
+    
+    trgInode.setModificationTime(timestamp, trgLatestSnapshot, inodeMap);
+    trgParent.updateModificationTime(timestamp, trgLatestSnapshot, inodeMap);
     // update quota on the parent directory ('count' files removed, 0 space)
-    unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
+    unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
   }
 
   /**
@@ -986,29 +1252,68 @@ public class FSDirectory implements Clos
    * 
    * @param src Path of a directory to delete
    * @param collectedBlocks Blocks under the deleted directory
+   * @param removedINodes INodes that should be removed from {@link #inodeMap}
    * @return true on successful deletion; else false
    */
-  boolean delete(String src, BlocksMapUpdateInfo collectedBlocks) 
-    throws UnresolvedLinkException {
+  boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
+      List<INode> removedINodes) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
     waitForReady();
     long now = now();
-    int filesRemoved;
+    final long filesRemoved;
     writeLock();
     try {
-      filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+      final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(
+          normalizePath(src), false);
+      if (!deleteAllowed(inodesInPath, src) ) {
+        filesRemoved = -1;
+      } else {
+        // Before removing the node, first check if the targetNode is for a
+        // snapshottable dir with snapshots, or its descendants have
+        // snapshottable dir with snapshots
+        final INode targetNode = inodesInPath.getLastINode();
+        List<INodeDirectorySnapshottable> snapshottableDirs = 
+            new ArrayList<INodeDirectorySnapshottable>();
+        checkSnapshot(targetNode, snapshottableDirs);
+        filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
+            removedINodes, now);
+        if (snapshottableDirs.size() > 0) {
+          // There are some snapshottable directories without snapshots to be
+          // deleted. Need to update the SnapshotManager.
+          namesystem.removeSnapshottableDirs(snapshottableDirs);
+        }
+      }
     } finally {
       writeUnlock();
     }
-    if (filesRemoved <= 0) {
+    if (filesRemoved < 0) {
       return false;
     }
-    incrDeletedFileCount(filesRemoved);
-    // Blocks will be deleted later by the caller of this method
-    getFSNamesystem().removePathAndBlocks(src, null);
     fsImage.getEditLog().logDelete(src, now);
+    incrDeletedFileCount(filesRemoved);
+    // Blocks/INodes will be handled later by the caller of this method
+    getFSNamesystem().removePathAndBlocks(src, null, null);
+    return true;
+  }
+  
+  private static boolean deleteAllowed(final INodesInPath iip,
+      final String src) {
+    final INode[] inodes = iip.getINodes(); 
+    if (inodes == null || inodes.length == 0
+        || inodes[inodes.length - 1] == null) {
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+            + "failed to remove " + src + " because it does not exist");
+      }
+      return false;
+    } else if (inodes.length == 1) { // src is the root
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
+          + "failed to remove " + src
+          + " because the root is not allowed to be deleted");
+      return false;
+    }
     return true;
   }
   
@@ -1018,12 +1323,14 @@ public class FSDirectory implements Clos
   boolean isNonEmptyDirectory(String path) throws UnresolvedLinkException {
     readLock();
     try {
-      final INode inode = rootDir.getNode(path, false);
+      final INodesInPath inodesInPath = rootDir.getLastINodeInPath(path, false);
+      final INode inode = inodesInPath.getINode(0);
       if (inode == null || !inode.isDirectory()) {
         //not found or not a directory
         return false;
       }
-      return ((INodeDirectory)inode).getChildrenList().size() != 0;
+      final Snapshot s = inodesInPath.getPathSnapshot();
+      return !inode.asDirectory().getChildrenList(s).isEmpty();
     } finally {
       readUnlock();
     }
@@ -1037,95 +1344,131 @@ public class FSDirectory implements Clos
    * <br>
    * @param src a string representation of a path to an inode
    * @param mtime the time the inode is removed
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */ 
-  void unprotectedDelete(String src, long mtime) 
-    throws UnresolvedLinkException {
+  void unprotectedDelete(String src, long mtime) throws UnresolvedLinkException,
+      QuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-    int filesRemoved = unprotectedDelete(src, collectedBlocks, mtime);
-    if (filesRemoved > 0) {
-      getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+    List<INode> removedINodes = new ArrayList<INode>();
+
+    final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(
+        normalizePath(src), false);
+    final long filesRemoved = deleteAllowed(inodesInPath, src) ? 
+        unprotectedDelete(inodesInPath, collectedBlocks, 
+            removedINodes, mtime) : -1;
+    if (filesRemoved >= 0) {
+      getFSNamesystem().removePathAndBlocks(src, collectedBlocks, 
+          removedINodes);
     }
   }
   
   /**
    * Delete a path from the name space
    * Update the count at each ancestor directory with quota
-   * @param src a string representation of a path to an inode
+   * @param iip the inodes resolved from the path
    * @param collectedBlocks blocks collected from the deleted path
+   * @param removedINodes inodes that should be removed from {@link #inodeMap}
    * @param mtime the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */ 
-  int unprotectedDelete(String src, BlocksMapUpdateInfo collectedBlocks, 
-      long mtime) throws UnresolvedLinkException {
+  long unprotectedDelete(INodesInPath iip, BlocksMapUpdateInfo collectedBlocks,
+      List<INode> removedINodes, long mtime) throws QuotaExceededException {
     assert hasWriteLock();
-    src = normalizePath(src);
 
-    final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, false);
-    final INode[] inodes = inodesInPath.getINodes();
-    INode targetNode = inodes[inodes.length-1];
+    // check if target node exists
+    INode targetNode = iip.getLastINode();
+    if (targetNode == null) {
+      return -1;
+    }
 
-    if (targetNode == null) { // non-existent src
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-            +"failed to remove "+src+" because it does not exist");
-      }
-      return 0;
+    // record modification
+    final Snapshot latestSnapshot = iip.getLatestSnapshot();
+    targetNode = targetNode.recordModification(latestSnapshot, inodeMap);
+    iip.setLastINode(targetNode);
+
+    // Remove the node from the namespace
+    long removed = removeLastINode(iip);
+    if (removed == -1) {
+      return -1;
     }
-    if (inodes.length == 1) { // src is the root
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
-          "failed to remove " + src +
-          " because the root is not allowed to be deleted");
+
+    // set the parent's modification time
+    final INodeDirectory parent = targetNode.getParent();
+    parent.updateModificationTime(mtime, latestSnapshot, inodeMap);
+    if (removed == 0) {
       return 0;
     }
-    // Remove the node from the namespace
-    targetNode = removeLastINode(inodesInPath);
-    if (targetNode == null) {
-      return 0;
+    
+    // collect block
+    if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
+      targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+    } else {
+      Quota.Counts counts = targetNode.cleanSubtree(null, latestSnapshot,
+          collectedBlocks, removedINodes);
+      parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+          -counts.get(Quota.DISKSPACE), true, Snapshot.INVALID_ID);
+      removed = counts.get(Quota.NAMESPACE);
     }
-    // set the parent's modification time
-    inodes[inodes.length - 2].setModificationTime(mtime);
-    int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-          +src+" is removed");
+          + targetNode.getFullPathName() + " is removed");
     }
-    removeAllFromInodesFromMap(targetNode);
-    return filesRemoved;
+    return removed;
   }
   
   /**
-   * Replaces the specified inode with the specified one.
+   * Check if the given INode (or one of its descendants) is snapshottable and
+   * already has snapshots.
+   * 
+   * @param target The given INode
+   * @param snapshottableDirs The list of directories that are snapshottable 
+   *                          but do not have snapshots yet
+   */
+  private static void checkSnapshot(INode target,
+      List<INodeDirectorySnapshottable> snapshottableDirs) throws IOException {
+    if (target.isDirectory()) {
+      INodeDirectory targetDir = target.asDirectory();
+      if (targetDir.isSnapshottable()) {
+        INodeDirectorySnapshottable ssTargetDir = 
+            (INodeDirectorySnapshottable) targetDir;
+        if (ssTargetDir.getNumSnapshots() > 0) {
+          throw new IOException("The direcotry " + ssTargetDir.getFullPathName()
+              + " cannot be deleted since " + ssTargetDir.getFullPathName()
+              + " is snapshottable and already has snapshots");
+        } else {
+          if (snapshottableDirs != null) {
+            snapshottableDirs.add(ssTargetDir);
+          }
+        }
+      } 
+      for (INode child : targetDir.getChildrenList(null)) {
+        checkSnapshot(child, snapshottableDirs);
+      }
+    }
+  }
+
+  /**
+   * Replaces the specified INodeFile with the specified one.
    */
-  public void replaceNode(String path, INodeFile oldnode, INodeFile newnode)
-      throws IOException, UnresolvedLinkException {    
+  void replaceINodeFile(String path, INodeFile oldnode,
+      INodeFile newnode) throws IOException {
     writeLock();
     try {
-      unprotectedReplaceNode(path, oldnode, newnode);
+      unprotectedReplaceINodeFile(path, oldnode, newnode);
     } finally {
       writeUnlock();
     }
   }
-  
-  void unprotectedReplaceNode(String path, INodeFile oldnode, INodeFile newnode)
-      throws IOException, UnresolvedLinkException {
-    assert hasWriteLock();
-    INodeDirectory parent = oldnode.parent;
-    // Remove the node from the namespace 
-    if (!oldnode.removeNode()) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
-                                   "failed to remove " + path);
-      throw new IOException("FSDirectory.replaceNode: " +
-                            "failed to remove " + path);
-    } 
-    removeFromInodeMap(oldnode);
-    
-    // Parent should be non-null, otherwise oldnode.removeNode() will return
-    // false
-    newnode.setLocalName(oldnode.getLocalNameBytes());
-    parent.addChild(newnode, true);
-    inodeMap.put(newnode);
-    
+
+  /** Replace an INodeFile and record modification for the latest snapshot. */
+  void unprotectedReplaceINodeFile(final String path, final INodeFile oldnode,
+      final INodeFile newnode) {
+    Preconditions.checkState(hasWriteLock());
+
+    oldnode.getParent().replaceChild(oldnode, newnode, inodeMap);
+    oldnode.clear();
+
     /* Currently oldnode and newnode are assumed to contain the same
      * blocks. Otherwise, blocks need to be removed from the blocksMap.
      */
@@ -1151,24 +1494,31 @@ public class FSDirectory implements Clos
 
     readLock();
     try {
-      INode targetNode = rootDir.getNode(srcs, true);
+      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
+        return getSnapshotsListing(srcs, startAfter);
+      }
+      final INodesInPath inodesInPath = rootDir.getLastINodeInPath(srcs, true);
+      final Snapshot snapshot = inodesInPath.getPathSnapshot();
+      final INode targetNode = inodesInPath.getINode(0);
       if (targetNode == null)
         return null;
       
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
-                targetNode, needLocation)}, 0);
+                targetNode, needLocation, snapshot)}, 0);
       }
-      INodeDirectory dirInode = (INodeDirectory)targetNode;
-      List<INode> contents = dirInode.getChildrenList();
-      int startChild = dirInode.nextChild(startAfter);
+
+      final INodeDirectory dirInode = targetNode.asDirectory();
+      final ReadOnlyList<INode> contents = dirInode.getChildrenList(snapshot);
+      int startChild = INodeDirectory.nextChild(contents, startAfter);
       int totalNumChildren = contents.size();
       int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
       HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
       for (int i=0; i<numOfListing; i++) {
         INode cur = contents.get(startChild+i);
-        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation);
+        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
+            needLocation, snapshot);
       }
       return new DirectoryListing(
           listing, totalNumChildren-startChild-numOfListing);
@@ -1176,6 +1526,35 @@ public class FSDirectory implements Clos
       readUnlock();
     }
   }
+  
+  /**
+   * Get a listing of all the snapshots of a snapshottable directory
+   */
+  private DirectoryListing getSnapshotsListing(String src, byte[] startAfter)
+      throws UnresolvedLinkException, IOException {
+    Preconditions.checkState(hasReadLock());
+    Preconditions.checkArgument(
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), 
+        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
+    
+    final String dirPath = normalizePath(src.substring(0,
+        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
+    
+    final INode node = this.getINode(dirPath);
+    final INodeDirectorySnapshottable dirNode = INodeDirectorySnapshottable
+        .valueOf(node, dirPath);
+    final ReadOnlyList<Snapshot> snapshots = dirNode.getSnapshotList();
+    int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
+    skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
+    int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
+    final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+    for (int i = 0; i < numOfListing; i++) {
+      Root sRoot = snapshots.get(i + skipSize).getRoot();
+      listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot, null);
+    }
+    return new DirectoryListing(
+        listing, snapshots.size() - skipSize - numOfListing);
+  }
 
   /** Get the file info for a specific file.
    * @param src The string representation of the path to the file
@@ -1188,17 +1567,36 @@ public class FSDirectory implements Clos
     String srcs = normalizePath(src);
     readLock();
     try {
-      INode targetNode = rootDir.getNode(srcs, resolveLink);
-      if (targetNode == null) {
-        return null;
-      }
-      else {
-        return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
+      if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
+        return getFileInfo4DotSnapshot(srcs);
       }
+      final INodesInPath inodesInPath = rootDir.getLastINodeInPath(srcs, resolveLink);
+      final INode i = inodesInPath.getINode(0);
+      return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
+          inodesInPath.getPathSnapshot());
     } finally {
       readUnlock();
     }
   }
+  
+  private HdfsFileStatus getFileInfo4DotSnapshot(String src)
+      throws UnresolvedLinkException {
+    Preconditions.checkArgument(
+        src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR), 
+        "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
+    
+    final String dirPath = normalizePath(src.substring(0,
+        src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
+    
+    final INode node = this.getINode(dirPath);
+    if (node != null
+        && node.isDirectory()
+        && node.asDirectory() instanceof INodeDirectorySnapshottable) {
+      return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
+          HdfsFileStatus.EMPTY_NAME, -1L);
+    }
+    return null;
+  }
 
   /**
    * Get the blocks associated with the file.
@@ -1208,35 +1606,76 @@ public class FSDirectory implements Clos
     readLock();
     try {
       final INode i = rootDir.getNode(src, false);
-      return i != null && i.isFile()? ((INodeFile)i).getBlocks(): null;
+      return i != null && i.isFile()? i.asFile().getBlocks(): null;
     } finally {
       readUnlock();
     }
   }
 
+
+  INodesInPath getExistingPathINodes(byte[][] components)
+      throws UnresolvedLinkException {
+    return INodesInPath.resolve(rootDir, components);
+  }
+
   /**
    * Get {@link INode} associated with the file / directory.
    */
-  INode getINode(String src) throws UnresolvedLinkException {
+  public INode getINode(String src) throws UnresolvedLinkException {
+    return getLastINodeInPath(src).getINode(0);
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   */
+  public INodesInPath getLastINodeInPath(String src)
+       throws UnresolvedLinkException {
     readLock();
     try {
-      return rootDir.getNode(src, true);
+      return rootDir.getLastINodeInPath(src, true);
     } finally {
       readUnlock();
     }
   }
   
-  
+  /**
+   * Get {@link INode} associated with the file / directory.
+   */
+  public INodesInPath getINodesInPath4Write(String src
+      ) throws UnresolvedLinkException, SnapshotAccessControlException {
+    readLock();
+    try {
+      return rootDir.getINodesInPath4Write(src, true);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Get {@link INode} associated with the file / directory.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  public INode getINode4Write(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    readLock();
+    try {
+      return rootDir.getINode4Write(src, true);
+    } finally {
+      readUnlock();
+    }
+  }
+
   /** 
    * Check whether the filepath could be created
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */
-  boolean isValidToCreate(String src) throws UnresolvedLinkException {
+  boolean isValidToCreate(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
     String srcs = normalizePath(src);
     readLock();
     try {
-      if (srcs.startsWith("/") && 
-          !srcs.endsWith("/") && 
-          rootDir.getNode(srcs, false) == null) {
+      if (srcs.startsWith("/") && !srcs.endsWith("/")
+          && rootDir.getINode4Write(srcs, false) == null) {
         return true;
       } else {
         return false;
@@ -1259,6 +1698,22 @@ public class FSDirectory implements Clos
       readUnlock();
     }
   }
+  
+  /**
+   * Check whether the path specifies a directory
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  boolean isDirMutable(String src) throws UnresolvedLinkException,
+      SnapshotAccessControlException {
+    src = normalizePath(src);
+    readLock();
+    try {
+      INode node = rootDir.getINode4Write(src, false);
+      return node != null && node.isDirectory();
+    } finally {
+      readUnlock();
+    }
+  }
 
   /** Updates namespace and diskspace consumed for all
    * directories until the parent directory of file represented by path.
@@ -1270,31 +1725,35 @@ public class FSDirectory implements Clos
    * @throws FileNotFoundException if path does not exist.
    */
   void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
-      throws QuotaExceededException, FileNotFoundException, UnresolvedLinkException {
+      throws QuotaExceededException, FileNotFoundException,
+          UnresolvedLinkException, SnapshotAccessControlException {
     writeLock();
     try {
-      final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
-      final INode[] inodes = inodesInPath.getINodes();
-      int len = inodes.length;
-      if (inodes[len - 1] == null) {
+      final INodesInPath iip = rootDir.getINodesInPath4Write(path, false);
+      if (iip.getLastINode() == null) {
         throw new FileNotFoundException("Path not found: " + path);
       }
-      updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
+      updateCount(iip, nsDelta, dsDelta, true);
     } finally {
       writeUnlock();
     }
   }
   
+  private void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
+      boolean checkQuota) throws QuotaExceededException {
+    updateCount(iip, iip.getINodes().length - 1, nsDelta, dsDelta, checkQuota);
+  }
+
   /** update count of each inode with quota
    * 
-   * @param inodes an array of inodes on a path
+   * @param iip inodes in a path
    * @param numOfINodes the number of inodes to update starting from index 0
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private void updateCount(INodesInPath inodesInPath, int numOfINodes, 
+  private void updateCount(INodesInPath iip, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     assert hasWriteLock();
@@ -1302,14 +1761,14 @@ public class FSDirectory implements Clos
       //still initializing. do not check or update quotas.
       return;
     }
-    final INode[] inodes = inodesInPath.getINodes();
+    final INode[] inodes = iip.getINodes();
     if (numOfINodes > inodes.length) {
       numOfINodes = inodes.length;
     }
     if (checkQuota) {
       verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
     }
-    unprotectedUpdateCount(inodesInPath, numOfINodes, nsDelta, dsDelta);
+    unprotectedUpdateCount(iip, numOfINodes, nsDelta, dsDelta);
   }
   
   /** 
@@ -1322,32 +1781,27 @@ public class FSDirectory implements Clos
     try {
       updateCount(inodesInPath, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
-      NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
+      NameNode.LOG.error("BUG: unexpected exception ", e);
     }
   }
   
   /**
    * updates quota without verification
    * callers responsibility is to make sure quota is not exceeded
-   * @param inodes
-   * @param numOfINodes
-   * @param nsDelta
-   * @param dsDelta
    */
-  private void unprotectedUpdateCount(INodesInPath inodesInPath,
+  private static void unprotectedUpdateCount(INodesInPath inodesInPath,
       int numOfINodes, long nsDelta, long dsDelta) {
-    assert hasWriteLock();
     final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
         INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-        node.addSpaceConsumed(nsDelta, dsDelta);
+        node.addSpaceConsumed2Cache(nsDelta, dsDelta);
       }
     }
   }
   
   /** Return the name of the path represented by inodes at [0, pos] */
-  private static String getFullPathName(INode[] inodes, int pos) {
+  static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
     if (inodes[0].isRoot()) {
       if (pos == 0) return Path.SEPARATOR;
@@ -1361,11 +1815,14 @@ public class FSDirectory implements Clos
     return fullPathName.toString();
   }
 
-  /** Return the full path name of the specified inode */
-  static String getFullPathName(INode inode) {
-    // calculate the depth of this inode from root
+  /**
+   * @return the relative path of an inode from one of its ancestors,
+   *         represented by an array of inodes.
+   */
+  private static INode[] getRelativePathINodes(INode inode, INode ancestor) {
+    // calculate the depth of this inode from the ancestor
     int depth = 0;
-    for (INode i = inode; i != null; i = i.parent) {
+    for (INode i = inode; i != null && !i.equals(ancestor); i = i.getParent()) {
       depth++;
     }
     INode[] inodes = new INode[depth];
@@ -1378,9 +1835,19 @@ public class FSDirectory implements Clos
         return null;
       }
       inodes[depth-i-1] = inode;
-      inode = inode.parent;
+      inode = inode.getParent();
     }
-    return getFullPathName(inodes, depth-1);
+    return inodes;
+  }
+  
+  private static INode[] getFullPathINodes(INode inode) {
+    return getRelativePathINodes(inode, null);
+  }
+  
+  /** Return the full path name of the specified inode */
+  static String getFullPathName(INode inode) {
+    INode[] inodes = getFullPathINodes(inode);
+    return getFullPathName(inodes, inodes.length - 1);
   }
   
   /**
@@ -1399,11 +1866,12 @@ public class FSDirectory implements Clos
    * @throws QuotaExceededException if directory creation violates 
    *                                any quota limit
    * @throws UnresolvedLinkException if a symlink is encountered in src.                      
+   * @throws SnapshotAccessControlException if path is in RO snapshot
    */
   boolean mkdirs(String src, PermissionStatus permissions,
       boolean inheritPermission, long now)
       throws FileAlreadyExistsException, QuotaExceededException, 
-             UnresolvedLinkException {
+             UnresolvedLinkException, SnapshotAccessControlException {
     src = normalizePath(src);
     String[] names = INode.getPathNames(src);
     byte[][] components = INode.getPathComponents(names);
@@ -1411,9 +1879,12 @@ public class FSDirectory implements Clos
 
     writeLock();
     try {
-      INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
-          components.length, false);
-      INode[] inodes = inodesInPath.getINodes();
+      INodesInPath iip = getExistingPathINodes(components);
+      if (iip.isSnapshot()) {
+        throw new SnapshotAccessControlException(
+            "Modification on RO snapshot is disallowed");
+      }
+      INode[] inodes = iip.getINodes();
 
       // find the index of the first null in inodes[]
       StringBuilder pathbuilder = new StringBuilder();
@@ -1461,7 +1932,7 @@ public class FSDirectory implements Clos
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
-        unprotectedMkdir(namesystem.allocateNewInodeId(), inodesInPath, i,
+        unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i,
             components[i], (i < lastInodeIndex) ? parentPermissions
                 : permissions, now);
         if (inodes[i] == null) {
@@ -1485,16 +1956,15 @@ public class FSDirectory implements Clos
     return true;
   }
 
-  INode unprotectedMkdir(long inodeId, String src,
-      PermissionStatus permissions, long timestamp)
-      throws QuotaExceededException, UnresolvedLinkException {
+  INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
+                          long timestamp) throws QuotaExceededException,
+                          UnresolvedLinkException {
     assert hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
-    INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
-        components.length, false);
-    INode[] inodes = inodesInPath.getINodes();
+    INodesInPath iip = getExistingPathINodes(components);
+    INode[] inodes = iip.getINodes();
     final int pos = inodes.length - 1;
-    unprotectedMkdir(inodeId, inodesInPath, pos, components[pos], permissions,
+    unprotectedMkdir(inodeId, iip, pos, components[pos], permissions,
         timestamp);
     return inodes[pos];
   }
@@ -1514,31 +1984,19 @@ public class FSDirectory implements Clos
     }
   }
   
-  private INode getFromINodeMap(INode inode) {
-    readLock();
-    try {
-      return inodeMap.get(inode);
-    } finally {
-      readUnlock();
-    }
-  }
-  
   /**
    * Add the given child to the namespace.
    * @param src The full path name of the child node.
    * @throw QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addINode(String src, INode child)
-      throws QuotaExceededException, UnresolvedLinkException {
+  private boolean addINode(String src, INode child
+      ) throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
-    byte[] path = components[components.length-1];
-    child.setLocalName(path);
+    child.setLocalName(components[components.length-1]);
     cacheName(child);
     writeLock();
     try {
-      INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
-          components.length, false);
-      return addLastINode(inodesInPath, child, true);
+      return addLastINode(getExistingPathINodes(components), child, true);
     } finally {
       writeUnlock();
     }
@@ -1557,36 +2015,27 @@ public class FSDirectory implements Clos
    *          Pass null if a node is not being moved.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
-      INode commonAncestor) throws QuotaExceededException {
-    if (!ready) {
-      // Do not check quota if edits log is still being processed
-      return;
-    }
+  private static void verifyQuota(INode[] inodes, int pos, long nsDelta,
+      long dsDelta, INode commonAncestor) throws QuotaExceededException {
     if (nsDelta <= 0 && dsDelta <= 0) {
       // if quota is being freed or not being consumed
       return;
     }
-    if (pos>inodes.length) {
-      pos = inodes.length;
-    }
-    int i = pos - 1;
-    try {
-      // check existing components in the path  
-      for(; i >= 0; i--) {
-        if (commonAncestor == inodes[i]) {
-          // Moving an existing node. Stop checking for quota when common
-          // ancestor is reached
-          return;
-        }
-        if (inodes[i].isQuotaSet()) { // a directory with quota
-          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-          node.verifyQuota(nsDelta, dsDelta);
+
+    // check existing components in the path
+    for(int i = (pos > inodes.length? inodes.length: pos) - 1; i >= 0; i--) {
+      if (commonAncestor == inodes[i]) {
+        // Stop checking for quota when common ancestor is reached
+        return;
+      }
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        try {
+          ((INodeDirectoryWithQuota)inodes[i]).verifyQuota(nsDelta, dsDelta);
+        } catch (QuotaExceededException e) {
+          e.setPathName(getFullPathName(inodes, i));
+          throw e;
         }
       }
-    } catch (QuotaExceededException e) {
-      e.setPathName(getFullPathName(inodes, i));
-      throw e;
     }
   }
   
@@ -1594,71 +2043,98 @@ public class FSDirectory implements Clos
    * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
    * dstInodes[dstInodes.length-1]
    * 
-   * @param srcInodes directory from where node is being moved.
-   * @param dstInodes directory to where node is moved to.
+   * @param src directory from where node is being moved.
+   * @param dst directory to where node is moved to.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  private void verifyQuotaForRename(INode[] srcInodes, INode[]dstInodes)
+  private void verifyQuotaForRename(INode[] src, INode[] dst)
       throws QuotaExceededException {
     if (!ready) {
       // Do not check quota if edits log is still being processed
       return;
     }
-    INode srcInode = srcInodes[srcInodes.length - 1];
-    INode commonAncestor = null;
-    for(int i =0;srcInodes[i] == dstInodes[i]; i++) {
-      commonAncestor = srcInodes[i];
-    }
-    INode.DirCounts srcCounts = new INode.DirCounts();
-    srcInode.spaceConsumedInTree(srcCounts);
-    long nsDelta = srcCounts.getNsCount();
-    long dsDelta = srcCounts.getDsCount();
+    int i = 0;
+    for(; src[i] == dst[i]; i++);
+    // src[i - 1] is the last common ancestor.
+
+    final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
     
     // Reduce the required quota by dst that is being removed
-    INode dstInode = dstInodes[dstInodes.length - 1];
-    if (dstInode != null) {
-      INode.DirCounts dstCounts = new INode.DirCounts();
-      dstInode.spaceConsumedInTree(dstCounts);
-      nsDelta -= dstCounts.getNsCount();
-      dsDelta -= dstCounts.getDsCount();
-    }
-    verifyQuota(dstInodes, dstInodes.length - 1, nsDelta, dsDelta,
-        commonAncestor);
+    final int dstIndex = dst.length - 1;
+    if (dst[dstIndex] != null) {
+      delta.subtract(dst[dstIndex].computeQuotaUsage());
+    }
+    verifyQuota(dst, dstIndex, delta.get(Quota.NAMESPACE),
+        delta.get(Quota.DISKSPACE), src[i - 1]);
+  }
+
+  /** Verify if the snapshot name is legal. */
+  void verifySnapshotName(String snapshotName, String path)
+      throws PathComponentTooLongException, IllegalNameException {
+    final byte[] bytes = DFSUtil.string2Bytes(snapshotName);
+    verifyINodeName(bytes);
+    verifyMaxComponentLength(bytes, path, 0);
   }
   
+  /** Verify if the inode name is legal. */
+  void verifyINodeName(byte[] childName) throws IllegalNameException {
+    if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) {
+      String s = "\"" + HdfsConstants.DOT_SNAPSHOT_DIR + "\" is a reserved name.";
+      if (!ready) {
+        s += "  Please rename it before upgrade.";
+      }
+      throw new IllegalNameException(s);
+    }
+  }
+
+  /**
+   * Verify child's name for fs limit.
+   * @throws PathComponentTooLongException child's name is too long.
+   */
+  void verifyMaxComponentLength(byte[] childName, Object parentPath, int pos)
+      throws PathComponentTooLongException {
+    if (maxComponentLength == 0) {
+      return;
+    }
+
+    final int length = childName.length;
+    if (length > maxComponentLength) {
+      final String p = parentPath instanceof INode[]?
+          getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
+      final PathComponentTooLongException e = new PathComponentTooLongException(
+          maxComponentLength, length, p, DFSUtil.bytes2String(childName));
+      if (ready) {
+        throw e;
+      } else {
+        // Do not throw if edits log is still being processed
+        NameNode.LOG.error("ERROR in FSDirectory.verifyINodeName", e);
+      }
+    }
+  }
+
   /**
-   * Verify that filesystem limit constraints are not violated
-   * @throws PathComponentTooLongException child's name is too long
-   * @throws MaxDirectoryItemsExceededException items per directory is exceeded
-   */
-  protected <T extends INode> void verifyFsLimits(INode[] pathComponents,
-      int pos, T child) throws FSLimitException {
-    boolean includeChildName = false;
-    try {
-      if (maxComponentLength != 0) {
-        int length = child.getLocalName().length();
-        if (length > maxComponentLength) {
-          includeChildName = true;
-          throw new PathComponentTooLongException(maxComponentLength, length);
-        }
-      }
-      if (maxDirItems != 0) {
-        INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
-        int count = parent.getChildrenList().size();
-        if (count >= maxDirItems) {
-          throw new MaxDirectoryItemsExceededException(maxDirItems, count);
-        }
-      }
-    } catch (FSLimitException e) {
-      String badPath = getFullPathName(pathComponents, pos-1);
-      if (includeChildName) {
-        badPath += Path.SEPARATOR + child.getLocalName();
-      }
-      e.setPathName(badPath);
-      // Do not throw if edits log is still being processed
-      if (ready) throw(e);
-      // log pre-existing paths that exceed limits
-      NameNode.LOG.error("FSDirectory.verifyFsLimits - " + e.getLocalizedMessage());
+   * Verify children size for fs limit.
+   * @throws MaxDirectoryItemsExceededException too many children.
+   */
+  void verifyMaxDirItems(INode[] pathComponents, int pos)
+      throws MaxDirectoryItemsExceededException {
+    if (maxDirItems == 0) {
+      return;
+    }
+
+    final INodeDirectory parent = pathComponents[pos-1].asDirectory();
+    final int count = parent.getChildrenList(null).size();
+    if (count >= maxDirItems) {
+      final MaxDirectoryItemsExceededException e
+          = new MaxDirectoryItemsExceededException(maxDirItems, count);
+      if (ready) {
+        e.setPathName(getFullPathName(pathComponents, pos - 1));
+        throw e;
+      } else {
+        // Do not throw if edits log is still being processed
+        NameNode.LOG.error("FSDirectory.verifyMaxDirItems: "
+            + e.getLocalizedMessage());
+      }
     }
   }
   
@@ -1678,9 +2154,9 @@ public class FSDirectory implements Clos
    *         otherwise return true;
    * @throw QuotaExceededException is thrown if it violates quota limit
    */
-  private boolean addChild(INodesInPath inodesInPath, int pos,
+  private boolean addChild(INodesInPath iip, int pos,
       INode child, boolean checkQuota) throws QuotaExceededException {
-    final INode[] inodes = inodesInPath.getINodes();
+    final INode[] inodes = iip.getINodes();
     // Disallow creation of /.reserved. This may be created when loading
     // editlog/fsimage during upgrade since /.reserved was a valid name in older
     // release. This may also be called when a user tries to create a file
@@ -1699,21 +2175,31 @@ public class FSDirectory implements Clos
     // original location becase a quota violation would cause the the item
     // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
-      verifyFsLimits(inodes, pos, child);
+      verifyMaxComponentLength(child.getLocalNameBytes(), inodes, pos);
+      verifyMaxDirItems(inodes, pos);
     }
+    // always verify inode name
+    verifyINodeName(child.getLocalNameBytes());
     
-    INode.DirCounts counts = new INode.DirCounts();
-    child.spaceConsumedInTree(counts);
-    updateCount(inodesInPath, pos, counts.getNsCount(), counts.getDsCount(), checkQuota);
-    if (inodes[pos-1] == null) {
-      throw new NullPointerException("Panic: parent does not exist");
+    final Quota.Counts counts = child.computeQuotaUsage();
+    updateCount(iip, pos,
+        counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
+    final INodeDirectory parent = inodes[pos-1].asDirectory();
+    boolean added = false;
+    try {
+      added = parent.addChild(child, true, iip.getLatestSnapshot(),
+          inodeMap);
+    } catch (QuotaExceededException e) {
+      updateCountNoQuotaCheck(iip, pos,
+          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      throw e;
     }
-    
-    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
     if (!added) {
-      updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
+      updateCountNoQuotaCheck(iip, pos,
+          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     } else {
-      inodeMap.put(child);
+      iip.setINode(pos - 1, child.getParent());
+      addToInodeMap(child);
     }
     return added;
   }
@@ -1730,20 +2216,36 @@ public class FSDirectory implements Clos
   /**
    * Remove the last inode in the path from the namespace.
    * Count of each ancestor with quota is also updated.
-   * @return the removed node; null if the removal fails.
+   * @return -1 for failing to remove;
+   *          0 for removing a reference whose referred inode has other 
+   *            reference nodes;
+   *         >0 otherwise. 
    */
-  private INode removeLastINode(final INodesInPath inodesInPath) {
-    final INode[] inodes = inodesInPath.getINodes();
-    final int pos = inodes.length - 1;
-    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
-    if (removedNode != null) {
-      INode.DirCounts counts = new INode.DirCounts();
-      removedNode.spaceConsumedInTree(counts);
-      updateCountNoQuotaCheck(inodesInPath, pos,
-                  -counts.getNsCount(), -counts.getDsCount());
-      removeFromInodeMap(removedNode);
+  private long removeLastINode(final INodesInPath iip)
+      throws QuotaExceededException {
+    final Snapshot latestSnapshot = iip.getLatestSnapshot();
+    final INode last = iip.getLastINode();
+    final INodeDirectory parent = iip.getINode(-2).asDirectory();
+    if (!parent.removeChild(last, latestSnapshot, inodeMap)) {
+      return -1;
+    }
+    INodeDirectory newParent = last.getParent();
+    if (parent != newParent) {
+      iip.setINode(-2, newParent);
+    }
+    
+    if (!last.isInLatestSnapshot(latestSnapshot)) {
+      final Quota.Counts counts = last.computeQuotaUsage();
+      updateCountNoQuotaCheck(iip, iip.getINodes().length - 1,
+          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+
+      if (INodeReference.tryRemoveReference(last) > 0) {
+        return 0;
+      } else {
+        return counts.get(Quota.NAMESPACE);
+      }
     }
-    return removedNode;
+    return 1;
   }
   
   /**
@@ -1772,105 +2274,49 @@ public class FSDirectory implements Clos
     }
   }
 
-  /** This method is always called with writeLock held */
-  final void addToInodeMapUnprotected(INode inode) {
-    inodeMap.put(inode);
+  public INodeMap getINodeMap() {
+    return inodeMap;
   }
   
-  /* This method is always called with writeLock held */
-  private final void removeFromInodeMap(INode inode) {
-    inodeMap.remove(inode);
+  /**
+   * This method is always called with writeLock of FSDirectory held.
+   */
+  public final void addToInodeMap(INode inode) {
+    if (inode instanceof INodeWithAdditionalFields) {
+      inodeMap.put((INodeWithAdditionalFields)inode);
+    }
   }
   
-  /** Remove all the inodes under given inode from the map */
-  private void removeAllFromInodesFromMap(INode inode) {
-    removeFromInodeMap(inode);
-    if (!inode.isDirectory()) {
-      return;
-    }
-    INodeDirectory dir = (INodeDirectory) inode;
-    for (INode child : dir.getChildrenList()) {
-      removeAllFromInodesFromMap(child);
+  /**
+   * This method is always called with writeLock of FSDirectory held.
+   */
+  public final void removeFromInodeMap(List<? extends INode> inodes) {
+    if (inodes != null) {
+      for (INode inode : inodes) {
+        if (inode != null && inode instanceof INodeWithAdditionalFields) {
+          inodeMap.remove(inode);
+        }
+      }
     }
-    dir.clearChildren();
   }
   
-  /** Update the count of each directory with quota in the namespace
-   * A directory's count is defined as the total number inodes in the tree
-   * rooted at the directory.
-   * 
-   * This is an update of existing state of the filesystem and does not
-   * throw QuotaExceededException.
+  /**
+   * Get the inode from inodeMap based on its inode id.
+   * @param id The given id
+   * @return The inode associated with the given id
    */
-  void updateCountForINodeWithQuota() {
-    updateCountForINodeWithQuota(this, rootDir, new INode.DirCounts(), 
-                                 new ArrayList<INode>(50));
+  public INode getInode(long id) {
+    readLock();
+    try {
+      return inodeMap.get(id);
+    } finally {
+      readUnlock();
+    }
   }
   
-  /** 
-   * Update the count of the directory if it has a quota and return the count
-   * 
-   * This does not throw a QuotaExceededException. This is just an update
-   * of of existing state and throwing QuotaExceededException does not help
-   * with fixing the state, if there is a problem.
-   * 
-   * @param dir the root of the tree that represents the directory
-   * @param counters counters for name space and disk space
-   * @param nodesInPath INodes for the each of components in the path.
-   */
-  private static void updateCountForINodeWithQuota(FSDirectory fsd,
-      INodeDirectory dir, INode.DirCounts counts, ArrayList<INode> nodesInPath) {
-    long parentNamespace = counts.nsCount;
-    long parentDiskspace = counts.dsCount;
-    
-    counts.nsCount = 1L;//for self. should not call node.spaceConsumedInTree()
-    counts.dsCount = 0L;
-    
-    /* We don't need nodesInPath if we could use 'parent' field in 
-     * INode. using 'parent' is not currently recommended. */
-    nodesInPath.add(dir);
-
-    for (INode child : dir.getChildrenList()) {
-      fsd.inodeMap.put(child);
-      if (child.isDirectory()) {
-        updateCountForINodeWithQuota(fsd, (INodeDirectory)child, 
-                                     counts, nodesInPath);
-      } else if (child.isSymlink()) {
-        counts.nsCount += 1;
-      } else { // reduce recursive calls
-        counts.nsCount += 1;
-        counts.dsCount += ((INodeFile)child).diskspaceConsumed();
-      }
-    }
-      
-    if (dir.isQuotaSet()) {
-      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(counts.nsCount,
-                                                      counts.dsCount);
-
-      // check if quota is violated for some reason.
-      if ((dir.getNsQuota() >= 0 && counts.nsCount > dir.getNsQuota()) ||
-          (dir.getDsQuota() >= 0 && counts.dsCount > dir.getDsQuota())) {
-
-        // can only happen because of a software bug. the bug should be fixed.
-        StringBuilder path = new StringBuilder(512);
-        for (INode n : nodesInPath) {
-          path.append('/');
-          path.append(n.getLocalName());
-        }
-        
-        NameNode.LOG.warn("Quota violation in image for " + path + 
-                          " (Namespace quota : " + dir.getNsQuota() +
-                          " consumed : " + counts.nsCount + ")" +
-                          " (Diskspace quota : " + dir.getDsQuota() +
-                          " consumed : " + counts.dsCount + ").");
-      }            
-    }
-      

[... 338 lines stripped ...]


Mime
View raw message