hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1451081 [1/2] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namen...
Date Thu, 28 Feb 2013 03:08:09 GMT
Author: szetszwo
Date: Thu Feb 28 03:08:08 2013
New Revision: 1451081

URL: http://svn.apache.org/r1451081
Log:
HDFS-4507. Update quota verification for snapshots.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/TestDiff.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt Thu Feb 28 03:08:08 2013
@@ -179,3 +179,5 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4523. Fix INodeFile replacement, TestQuota and javac errors from trunk
   merge.  (szetszwo)
+
+  HDFS-4507. Update quota verification for snapshots.  (szetszwo)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FSLimitException.java Thu Feb 28 03:08:08 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.Path;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -48,21 +47,29 @@ public abstract class FSLimitException e
   class PathComponentTooLongException extends FSLimitException {
     protected static final long serialVersionUID = 1L;
 
+    private String childName;
+
     protected PathComponentTooLongException() {}
 
     protected PathComponentTooLongException(String msg) {
       super(msg);
     }
     
-    public PathComponentTooLongException(long quota, long count) {
+    public PathComponentTooLongException(long quota, long count,
+        String parentPath, String childName) {
       super(quota, count);
+      setPathName(parentPath);
+      this.childName = childName;
+    }
+
+    String getParentPath() {
+      return pathName;
     }
 
     @Override
     public String getMessage() {
-      Path violator = new Path(pathName);
-      return "The maximum path component name limit of " + violator.getName() +
-      " in directory " + violator.getParent() +
+      return "The maximum path component name limit of " + childName +
+      " in directory " + getParentPath() +
       " is exceeded: limit=" + quota + " length=" + count; 
     }
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java Thu Feb 28 03:08:08 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.classification.
 public final class NSQuotaExceededException extends QuotaExceededException {
   protected static final long serialVersionUID = 1L;
   
+  private String prefix;
+  
   public NSQuotaExceededException() {}
 
   public NSQuotaExceededException(String msg) {
@@ -40,11 +42,19 @@ public final class NSQuotaExceededExcept
   public String getMessage() {
     String msg = super.getMessage();
     if (msg == null) {
-      return "The NameSpace quota (directories and files)" + 
+      msg = "The NameSpace quota (directories and files)" + 
       (pathName==null?"":(" of directory " + pathName)) + 
           " is exceeded: quota=" + quota + " file count=" + count; 
-    } else {
-      return msg;
+
+      if (prefix != null) {
+        msg = prefix + ": " + msg;
+      }
     }
+    return msg;
+  }
+
+  /** Set a prefix for the error message. */
+  public void setMessagePrefix(final String prefix) {
+    this.prefix = prefix;
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Feb 28 03:08:08 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@@ -838,7 +839,8 @@ public class FSDirectory implements Clos
     long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
     updateCount(inodesInPath, 0, dsDelta, true);
 
-    fileNode.setFileReplication(replication, inodesInPath.getLatestSnapshot());
+    fileNode = fileNode.setFileReplication(
+        replication, inodesInPath.getLatestSnapshot());
 
     if (oldReplication != null) {
       oldReplication[0] = oldRepl;
@@ -877,7 +879,7 @@ public class FSDirectory implements Clos
   
   void setPermission(String src, FsPermission permission)
       throws FileNotFoundException, UnresolvedLinkException,
-      SnapshotAccessControlException {
+      NSQuotaExceededException, SnapshotAccessControlException {
     writeLock();
     try {
       unprotectedSetPermission(src, permission);
@@ -889,7 +891,7 @@ public class FSDirectory implements Clos
   
   void unprotectedSetPermission(String src, FsPermission permissions)
       throws FileNotFoundException, UnresolvedLinkException,
-      SnapshotAccessControlException {
+      NSQuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
     final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
     final INode inode = inodesInPath.getLastINode();
@@ -901,7 +903,7 @@ public class FSDirectory implements Clos
 
   void setOwner(String src, String username, String groupname)
       throws FileNotFoundException, UnresolvedLinkException,
-      SnapshotAccessControlException {
+      NSQuotaExceededException, SnapshotAccessControlException {
     writeLock();
     try {
       unprotectedSetOwner(src, username, groupname);
@@ -913,7 +915,7 @@ public class FSDirectory implements Clos
 
   void unprotectedSetOwner(String src, String username, String groupname)
       throws FileNotFoundException, UnresolvedLinkException,
-      SnapshotAccessControlException {
+      NSQuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
     final INodesInPath inodesInPath = rootDir.getINodesInPath4Write(src, true);
     INode inode = inodesInPath.getLastINode();
@@ -932,7 +934,8 @@ public class FSDirectory implements Clos
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
   public void concat(String target, String [] srcs) 
-      throws UnresolvedLinkException, SnapshotAccessControlException {
+      throws UnresolvedLinkException, NSQuotaExceededException,
+      SnapshotAccessControlException {
     writeLock();
     try {
       // actual move
@@ -956,7 +959,8 @@ public class FSDirectory implements Clos
    * NOTE: - it does not update quota (not needed for concat)
    */
   public void unprotectedConcat(String target, String [] srcs, long timestamp) 
-      throws UnresolvedLinkException, SnapshotAccessControlException {
+      throws UnresolvedLinkException, NSQuotaExceededException,
+      SnapshotAccessControlException {
     assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
@@ -1093,8 +1097,8 @@ public class FSDirectory implements Clos
    * @param mtime the time the inode is removed
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */ 
-  void unprotectedDelete(String src, long mtime) 
-    throws UnresolvedLinkException, SnapshotAccessControlException {
+  void unprotectedDelete(String src, long mtime) throws UnresolvedLinkException,
+      NSQuotaExceededException, SnapshotAccessControlException {
     assert hasWriteLock();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
 
@@ -1116,7 +1120,8 @@ public class FSDirectory implements Clos
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */ 
   int unprotectedDelete(INodesInPath inodesInPath,
-      BlocksMapUpdateInfo collectedBlocks, long mtime) {
+      BlocksMapUpdateInfo collectedBlocks, long mtime)
+          throws NSQuotaExceededException {
     assert hasWriteLock();
 
     // check if target node exists
@@ -1812,37 +1817,62 @@ public class FSDirectory implements Clos
   
   /**
    * Verify that filesystem limit constraints are not violated
-   * @throws PathComponentTooLongException child's name is too long
-   * @throws MaxDirectoryItemsExceededException items per directory is exceeded
    */
-  protected <T extends INode> void verifyFsLimits(INode[] pathComponents,
-      int pos, T child) throws FSLimitException {
-    boolean includeChildName = false;
-    try {
-      if (maxComponentLength != 0) {
-        int length = child.getLocalName().length();
-        if (length > maxComponentLength) {
-          includeChildName = true;
-          throw new PathComponentTooLongException(maxComponentLength, length);
-        }
+  void verifyFsLimits(INode[] pathComponents, int pos, INode child)
+      throws FSLimitException {
+    verifyMaxComponentLength(child.getLocalName(), pathComponents, pos);
+    verifyMaxDirItems(pathComponents, pos);
+  }
+
+  /**
+   * Verify child's name for fs limit.
+   * @throws PathComponentTooLongException child's name is too long.
+   */
+  public void verifyMaxComponentLength(String childName,
+      Object parentPath, int pos) throws PathComponentTooLongException {
+    if (maxComponentLength == 0) {
+      return;
+    }
+
+    final int length = childName.length();
+    if (length > maxComponentLength) {
+      final String p = parentPath instanceof INode[]?
+          getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
+      final PathComponentTooLongException e = new PathComponentTooLongException(
+          maxComponentLength, length, p, childName);
+      if (ready) {
+        throw e;
+      } else {
+        // Do not throw if edits log is still being processed
+        NameNode.LOG.error("FSDirectory.verifyMaxComponentLength: "
+            + e.getLocalizedMessage());
       }
-      if (maxDirItems != 0) {
-        INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
-        int count = parent.getChildrenList(null).size();
-        if (count >= maxDirItems) {
-          throw new MaxDirectoryItemsExceededException(maxDirItems, count);
-        }
+    }
+  }
+
+  /**
+   * Verify children size for fs limit.
+   * @throws MaxDirectoryItemsExceededException too many children.
+   */
+  private void verifyMaxDirItems(INode[] pathComponents, int pos)
+      throws MaxDirectoryItemsExceededException {
+    if (maxDirItems == 0) {
+      return;
+    }
+
+    final INodeDirectory parent = (INodeDirectory)pathComponents[pos-1];
+    final int count = parent.getChildrenList(null).size();
+    if (count >= maxDirItems) {
+      final MaxDirectoryItemsExceededException e
+          = new MaxDirectoryItemsExceededException(maxDirItems, count);
+      if (ready) {
+        e.setPathName(getFullPathName(pathComponents, pos - 1));
+        throw e;
+      } else {
+        // Do not throw if edits log is still being processed
+        NameNode.LOG.error("FSDirectory.verifyMaxDirItems: "
+            + e.getLocalizedMessage());
       }
-    } catch (FSLimitException e) {
-      String badPath = getFullPathName(pathComponents, pos-1);
-      if (includeChildName) {
-        badPath += Path.SEPARATOR + child.getLocalName();
-      }
-      e.setPathName(badPath);
-      // Do not throw if edits log is still being processed
-      if (ready) throw(e);
-      // log pre-existing paths that exceed limits
-      NameNode.LOG.error("FSDirectory.verifyFsLimits - " + e.getLocalizedMessage());
     }
   }
   
@@ -1900,20 +1930,22 @@ public class FSDirectory implements Clos
    * Remove the last inode in the path from the namespace.
    * Count of each ancestor with quota is also updated.
    * @return the removed node; null if the removal fails.
+   * @throws NSQuotaExceededException 
    */
-  private INode removeLastINode(final INodesInPath inodesInPath) {
+  private INode removeLastINode(final INodesInPath inodesInPath)
+      throws NSQuotaExceededException {
+    final Snapshot latestSnapshot = inodesInPath.getLatestSnapshot();
     final INode[] inodes = inodesInPath.getINodes();
     final int pos = inodes.length - 1;
     final boolean removed = ((INodeDirectory)inodes[pos-1]).removeChild(
-        inodes[pos], inodesInPath.getLatestSnapshot());
-    if (removed) {
+        inodes[pos], latestSnapshot);
+    if (removed && latestSnapshot == null) {
       inodesInPath.setINode(pos - 1, inodes[pos].getParent());
       final Quota.Counts counts = inodes[pos].computeQuotaUsage();
       updateCountNoQuotaCheck(inodesInPath, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
-      return inodes[pos];
     }
-    return null;
+    return removed? inodes[pos]: null;
   }
   
   /**
@@ -1986,7 +2018,7 @@ public class FSDirectory implements Clos
       final Snapshot latest = iip.getLatestSnapshot();
       if (dirNode instanceof INodeDirectoryWithQuota) { 
         // a directory with quota; so set the quota to the new value
-        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota, latest);
+        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
         if (!dirNode.isQuotaSet() && latest == null) {
           // will not come here for root because root's nsQuota is always set
           return dirNode.replaceSelf4INodeDirectory();
@@ -2033,7 +2065,7 @@ public class FSDirectory implements Clos
    * Sets the access time on the file/directory. Logs it in the transaction log.
    */
   void setTimes(String src, INode inode, long mtime, long atime, boolean force,
-      Snapshot latest) {
+      Snapshot latest) throws NSQuotaExceededException {
     boolean status = false;
     writeLock();
     try {
@@ -2047,7 +2079,7 @@ public class FSDirectory implements Clos
   }
 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
-      throws UnresolvedLinkException {
+      throws UnresolvedLinkException, NSQuotaExceededException {
     assert hasWriteLock();
     final INodesInPath i = getLastINodeInPath(src); 
     return unprotectedSetTimes(src, i.getLastINode(), mtime, atime, force,
@@ -2055,7 +2087,8 @@ public class FSDirectory implements Clos
   }
 
   private boolean unprotectedSetTimes(String src, INode inode, long mtime,
-      long atime, boolean force, Snapshot latest) {
+      long atime, boolean force, Snapshot latest)
+          throws NSQuotaExceededException {
     assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Feb 28 03:08:08 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.now;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -28,28 +30,26 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.util.Time.now;
-
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -62,9 +62,6 @@ import org.apache.hadoop.hdfs.util.MD5Fi
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.IdGenerator;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HAUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -726,7 +723,8 @@ public class FSImage implements Closeabl
     return lastAppliedTxId - prevLastAppliedTxId;
   }
 
-  /** Update the count of each directory with quota in the namespace
+  /**
+   * Update the count of each directory with quota in the namespace.
    * A directory's count is defined as the total number inodes in the tree
    * rooted at the directory.
    * 
@@ -734,39 +732,22 @@ public class FSImage implements Closeabl
    * throw QuotaExceededException.
    */
   static void updateCountForQuota(INodeDirectoryWithQuota root) {
-    updateCountForINodeWithQuota(root, new Quota.Counts(), new Stack<INode>());
+    updateCountForQuotaRecursively(root, new Quota.Counts());
   }
   
-  /** 
-   * Update the count of the directory if it has a quota and return the count
-   * 
-   * This does not throw a QuotaExceededException. This is just an update
-   * of of existing state and throwing QuotaExceededException does not help
-   * with fixing the state, if there is a problem.
-   * 
-   * @param dir the root of the tree that represents the directory
-   * @param counters counters for name space and disk space
-   * @param stack INodes for the each of components in the path.
-   */
-  private static void updateCountForINodeWithQuota(INodeDirectory dir,
-      Quota.Counts counts, Stack<INode> stack) {
-    // The stack is not needed since we could use the 'parent' field in INode.
-    // However, using 'parent' is not recommended.
-    stack.push(dir);
-
+  private static void updateCountForQuotaRecursively(INodeDirectory dir,
+      Quota.Counts counts) {
     final long parentNamespace = counts.get(Quota.NAMESPACE);
     final long parentDiskspace = counts.get(Quota.DISKSPACE);
     
-    counts.add(Quota.NAMESPACE, 1);
+    dir.computeQuotaUsage4CurrentDirectory(counts);
+
     for (INode child : dir.getChildrenList(null)) {
       if (child.isDirectory()) {
-        updateCountForINodeWithQuota((INodeDirectory)child, counts, stack);
+        updateCountForQuotaRecursively((INodeDirectory)child, counts);
       } else {
         // file or symlink: count here to reduce recursive calls.
-        counts.add(Quota.NAMESPACE, 1);
-        if (child.isFile()) {
-          counts.add(Quota.DISKSPACE, ((INodeFile)child).diskspaceConsumed());
-        }
+        child.computeQuotaUsage(counts, false);
       }
     }
       
@@ -774,24 +755,20 @@ public class FSImage implements Closeabl
       // check if quota is violated. It indicates a software bug.
       final long namespace = counts.get(Quota.NAMESPACE) - parentNamespace;
       if (Quota.isViolated(dir.getNsQuota(), namespace)) {
-        final INode[] inodes = stack.toArray(new INode[stack.size()]);
         LOG.error("BUG: Namespace quota violation in image for "
-            + FSDirectory.getFullPathName(inodes, inodes.length)
+            + dir.getFullPathName()
             + " quota = " + dir.getNsQuota() + " < consumed = " + namespace);
       }
 
       final long diskspace = counts.get(Quota.DISKSPACE) - parentDiskspace;
       if (Quota.isViolated(dir.getDsQuota(), diskspace)) {
-        final INode[] inodes = stack.toArray(new INode[stack.size()]);
         LOG.error("BUG: Diskspace quota violation in image for "
-            + FSDirectory.getFullPathName(inodes, inodes.length)
+            + dir.getFullPathName()
             + " quota = " + dir.getDsQuota() + " < consumed = " + diskspace);
       }
 
       ((INodeDirectoryWithQuota)dir).setSpaceConsumed(namespace, diskspace);
     }
-      
-    stack.pop();
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu Feb 28 03:08:08 2013
@@ -319,7 +319,7 @@ public class FSImageFormat {
     long dsQuota = root.getDsQuota();
     FSDirectory fsDir = namesystem.dir;
     if (nsQuota != -1 || dsQuota != -1) {
-      fsDir.rootDir.setQuota(nsQuota, dsQuota, null);
+      fsDir.rootDir.setQuota(nsQuota, dsQuota);
     }
     fsDir.rootDir.cloneModificationTime(root);
     fsDir.rootDir.clonePermissionStatus(root);    
@@ -503,7 +503,7 @@ public class FSImageFormat {
    */
   private void addToParent(INodeDirectory parent, INode child) {
     // NOTE: This does not update space counts for parents
-    if (!parent.addChild(child, false, null)) {
+    if (!parent.addChild(child)) {
       return;
     }
     namesystem.dir.cacheName(child);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu Feb 28 03:08:08 2013
@@ -32,12 +32,14 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.primitives.SignedBytes;
 
 /**
@@ -158,11 +160,8 @@ public abstract class INode implements D
   public final PermissionStatus getPermissionStatus() {
     return getPermissionStatus(null);
   }
-  private INode updatePermissionStatus(PermissionStatusFormat f, long n,
-      Snapshot latest) {
-    final INode nodeToUpdate = recordModification(latest);
-    nodeToUpdate.permission = f.combine(n, permission);
-    return nodeToUpdate;
+  private void updatePermissionStatus(PermissionStatusFormat f, long n) {
+    this.permission = f.combine(n, permission);
   }
   /**
    * @param snapshot
@@ -183,9 +182,16 @@ public abstract class INode implements D
     return getUserName(null);
   }
   /** Set user */
-  protected INode setUser(String user, Snapshot latest) {
+  final void setUser(String user) {
     int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
-    return updatePermissionStatus(PermissionStatusFormat.USER, n, latest);
+    updatePermissionStatus(PermissionStatusFormat.USER, n);
+  }
+  /** Set user */
+  final INode setUser(String user, Snapshot latest)
+      throws NSQuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest);
+    nodeToUpdate.setUser(user);
+    return nodeToUpdate;
   }
   /**
    * @param snapshot
@@ -206,9 +212,16 @@ public abstract class INode implements D
     return getGroupName(null);
   }
   /** Set group */
-  protected INode setGroup(String group, Snapshot latest) {
+  final void setGroup(String group) {
     int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
-    return updatePermissionStatus(PermissionStatusFormat.GROUP, n, latest);
+    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
+  }
+  /** Set group */
+  final INode setGroup(String group, Snapshot latest)
+      throws NSQuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest);
+    nodeToUpdate.setGroup(group);
+    return nodeToUpdate;
   }
   /**
    * @param snapshot
@@ -232,9 +245,16 @@ public abstract class INode implements D
     return (short)PermissionStatusFormat.MODE.retrieve(permission);
   }
   /** Set the {@link FsPermission} of this {@link INode} */
-  INode setPermission(FsPermission permission, Snapshot latest) {
+  void setPermission(FsPermission permission) {
     final short mode = permission.toShort();
-    return updatePermissionStatus(PermissionStatusFormat.MODE, mode, latest);
+    updatePermissionStatus(PermissionStatusFormat.MODE, mode);
+  }
+  /** Set the {@link FsPermission} of this {@link INode} */
+  INode setPermission(FsPermission permission, Snapshot latest)
+      throws NSQuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest);
+    nodeToUpdate.setPermission(permission);
+    return nodeToUpdate;
   }
 
   /**
@@ -263,7 +283,8 @@ public abstract class INode implements D
    *         However, in some cases, this inode may be replaced with a new inode
    *         for maintaining snapshots. The current inode is then the new inode.
    */
-  abstract INode recordModification(final Snapshot latest);
+  abstract INode recordModification(final Snapshot latest)
+      throws NSQuotaExceededException;
 
   /**
    * Check whether it's a file.
@@ -335,7 +356,7 @@ public abstract class INode implements D
    * @return the number of deleted inodes in the subtree.
    */
   public abstract int cleanSubtree(final Snapshot snapshot, Snapshot prior,
-      BlocksMapUpdateInfo collectedBlocks);
+      BlocksMapUpdateInfo collectedBlocks) throws NSQuotaExceededException;
   
   /**
    * Destroy self and clear everything! If the INode is a file, this method
@@ -428,6 +449,16 @@ public abstract class INode implements D
   public abstract Content.Counts computeContentSummary(Content.Counts counts);
   
   /**
+   * Check and add namespace consumed to itself and the ancestors.
+   * @throws NSQuotaExceededException if quote is violated.
+   */
+  public void addNamespaceConsumed(int delta) throws NSQuotaExceededException {
+    if (parent != null) {
+      parent.addNamespaceConsumed(delta);
+    }
+  }
+
+  /**
    * Get the quota set for this inode
    * @return the quota if it is set; -1 otherwise
    */
@@ -447,7 +478,7 @@ public abstract class INode implements D
    * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
    */
   final Quota.Counts computeQuotaUsage() {
-    return computeQuotaUsage(new Quota.Counts());
+    return computeQuotaUsage(new Quota.Counts(), true);
   }
 
   /**
@@ -456,7 +487,8 @@ public abstract class INode implements D
    * @param counts The subtree counts for returning.
    * @return The same objects as the counts parameter.
    */
-  abstract Quota.Counts computeQuotaUsage(Quota.Counts counts);
+  public abstract Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache);
   
   /**
    * @return null if the local name is null; otherwise, return the local name.
@@ -574,8 +606,9 @@ public abstract class INode implements D
   }
 
   /** Update modification time if it is larger than the current value. */
-  public final INode updateModificationTime(long mtime, Snapshot latest) {
-    assert isDirectory();
+  public final INode updateModificationTime(long mtime, Snapshot latest)
+      throws NSQuotaExceededException {
+    Preconditions.checkState(isDirectory());
     if (mtime <= modificationTime) {
       return this;
     }
@@ -586,12 +619,15 @@ public abstract class INode implements D
     this.modificationTime = that.modificationTime;
   }
 
-  /**
-   * Always set the last modification time of inode.
-   */
-  public final INode setModificationTime(long modtime, Snapshot latest) {
+  /** Set the last modification time of inode. */
+  public final void setModificationTime(long modificationTime) {
+    this.modificationTime = modificationTime;
+  }
+  /** Set the last modification time of inode. */
+  public final INode setModificationTime(long modificationTime, Snapshot latest)
+      throws NSQuotaExceededException {
     final INode nodeToUpdate = recordModification(latest);
-    nodeToUpdate.modificationTime = modtime;
+    nodeToUpdate.setModificationTime(modificationTime);
     return nodeToUpdate;
   }
 
@@ -617,9 +653,16 @@ public abstract class INode implements D
   /**
    * Set last access time of inode.
    */
-  public INode setAccessTime(long atime, Snapshot latest) {
+  public void setAccessTime(long accessTime) {
+    this.accessTime = accessTime;
+  }
+  /**
+   * Set last access time of inode.
+   */
+  public INode setAccessTime(long accessTime, Snapshot latest)
+      throws NSQuotaExceededException {
     final INode nodeToUpdate = recordModification(latest);
-    nodeToUpdate.accessTime = atime;
+    nodeToUpdate.setAccessTime(accessTime);
     return nodeToUpdate;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Thu Feb 28 03:08:08 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
@@ -97,18 +98,12 @@ public class INodeDirectory extends INod
     return false;
   }
 
-  private void assertChildrenNonNull() {
-    if (children == null) {
-      throw new AssertionError("children is null: " + this);
-    }
-  }
-
   private int searchChildren(byte[] name) {
-    return Collections.binarySearch(children, name);
+    return children == null? -1: Collections.binarySearch(children, name);
   }
 
-  protected int searchChildrenForExistingINode(final INode inode) {
-    assertChildrenNonNull();
+  private int searchChildrenForExistingINode(final INode inode) {
+    Preconditions.checkNotNull(children);
     final byte[] name = inode.getLocalNameBytes();
     final int i = searchChildren(name);
     if (i < 0) {
@@ -124,7 +119,8 @@ public class INodeDirectory extends INod
    * @param child the child inode to be removed
    * @param latest See {@link INode#recordModification(Snapshot)}.
    */
-  public boolean removeChild(INode child, Snapshot latest) {
+  public boolean removeChild(INode child, Snapshot latest)
+      throws NSQuotaExceededException {
     if (isInLatestSnapshot(latest)) {
       return replaceSelf4INodeDirectoryWithSnapshot()
           .removeChild(child, latest);
@@ -142,7 +138,7 @@ public class INodeDirectory extends INod
    * @return true if the child is removed; false if the child is not found.
    */
   protected final boolean removeChild(final INode child) {
-    assertChildrenNonNull();
+    Preconditions.checkNotNull(children);
     final int i = searchChildren(child.getLocalNameBytes());
     if (i < 0) {
       return false;
@@ -165,7 +161,7 @@ public class INodeDirectory extends INod
    * {@link INodeDirectoryWithSnapshot} depending on the latest snapshot.
    */
   INodeDirectoryWithQuota replaceSelf4Quota(final Snapshot latest,
-      final long nsQuota, final long dsQuota) {
+      final long nsQuota, final long dsQuota) throws NSQuotaExceededException {
     Preconditions.checkState(!(this instanceof INodeDirectoryWithQuota),
         "this is already an INodeDirectoryWithQuota, this=%s", this);
 
@@ -176,13 +172,13 @@ public class INodeDirectory extends INod
       return q;
     } else {
       final INodeDirectoryWithSnapshot s = new INodeDirectoryWithSnapshot(this);
-      s.setQuota(nsQuota, dsQuota, null);
+      s.setQuota(nsQuota, dsQuota);
       return replaceSelf(s).saveSelf2Snapshot(latest, this);
     }
   }
   /** Replace itself with an {@link INodeDirectorySnapshottable}. */
   public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
-      Snapshot latest) {
+      Snapshot latest) throws NSQuotaExceededException {
     Preconditions.checkState(!(this instanceof INodeDirectorySnapshottable),
         "this is already an INodeDirectorySnapshottable, this=%s", this);
     final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
@@ -213,7 +209,7 @@ public class INodeDirectory extends INod
   }
 
   public void replaceChild(final INode oldChild, final INode newChild) {
-    assertChildrenNonNull();
+    Preconditions.checkNotNull(children);
     final int i = searchChildrenForExistingINode(newChild);
     final INode removed = children.set(i, newChild);
     Preconditions.checkState(removed == oldChild);
@@ -247,7 +243,8 @@ public class INodeDirectory extends INod
   }
 
   @Override
-  public INodeDirectory recordModification(Snapshot latest) {
+  public INodeDirectory recordModification(Snapshot latest)
+      throws NSQuotaExceededException {
     return isInLatestSnapshot(latest)?
         replaceSelf4INodeDirectoryWithSnapshot().recordModification(latest)
         : this;
@@ -259,7 +256,7 @@ public class INodeDirectory extends INod
    * @return the child inode, which may be replaced.
    */
   public INode saveChild2Snapshot(final INode child, final Snapshot latest,
-      final INode snapshotCopy) {
+      final INode snapshotCopy) throws NSQuotaExceededException {
     if (latest == null) {
       return child;
     }
@@ -387,11 +384,11 @@ public class INodeDirectory extends INod
       if (index >= 0) {
         existing.addNode(curNode);
       }
-      if (curNode instanceof INodeDirectorySnapshottable) {
+      if (curNode instanceof INodeDirectoryWithSnapshot) {
         //if the path is a non-snapshot path, update the latest snapshot.
         if (!existing.isSnapshot()) {
           existing.updateLatestSnapshot(
-              ((INodeDirectorySnapshottable)curNode).getLastSnapshot());
+              ((INodeDirectoryWithSnapshot)curNode).getLastSnapshot());
         }
       }
       if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) {
@@ -488,64 +485,64 @@ public class INodeDirectory extends INod
    *         otherwise, return true;
    */
   public boolean addChild(INode node, final boolean setModTime,
-      final Snapshot latest) {
+      final Snapshot latest) throws NSQuotaExceededException {
+    final int low = searchChildren(node.getLocalNameBytes());
+    if (low >= 0) {
+      return false;
+    }
+
     if (isInLatestSnapshot(latest)) {
       return replaceSelf4INodeDirectoryWithSnapshot()
           .addChild(node, setModTime, latest);
     }
-
-    if (children == null) {
-      children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
+    addChild(node, low);
+    if (setModTime) {
+      // update modification time of the parent directory
+      updateModificationTime(node.getModificationTime(), latest);
     }
+    return true;
+  }
+
+
+  /** The same as addChild(node, false, null, false) */
+  public boolean addChild(INode node) {
     final int low = searchChildren(node.getLocalNameBytes());
     if (low >= 0) {
       return false;
     }
-    node.parent = this;
-    children.add(-low - 1, node);
-    // update modification time of the parent directory
-    if (setModTime) {
-      updateModificationTime(node.getModificationTime(), latest);
-    }
-    if (node.getGroupName() == null) {
-      node.setGroup(getGroupName(), null);
-    }
+    addChild(node, low);
     return true;
   }
 
   /**
-   * Add new INode to the file tree.
-   * Find the parent and insert 
-   * 
-   * @param path file path
-   * @param newNode INode to be added
-   * @return false if the node already exists; otherwise, return true;
-   * @throws FileNotFoundException if parent does not exist or 
-   * @throws UnresolvedLinkException if any path component is a symbolic link
-   * is not a directory.
-   */
-  boolean addINode(String path, INode newNode
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
-    byte[][] pathComponents = getPathComponents(path);        
-    if (pathComponents.length < 2) { // add root
-      return false;
+   * Add the node to the children list at the given insertion point.
+   * The basic add method which actually calls children.add(..).
+   */
+  private void addChild(final INode node, final int insertionPoint) {
+    if (children == null) {
+      children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
+    }
+    node.parent = this;
+    children.add(-insertionPoint - 1, node);
+
+    if (node.getGroupName() == null) {
+      node.setGroup(getGroupName());
     }
-    newNode.setLocalName(pathComponents[pathComponents.length - 1]);
-    // insert into the parent children list
-    final INodesInPath iip =  getExistingPathINodes(pathComponents, 2, false);
-    final INodeDirectory parent = INodeDirectory.valueOf(iip.getINode(0),
-        pathComponents);
-    return parent.addChild(newNode, true, iip.getLatestSnapshot());
   }
 
   @Override
-  Quota.Counts computeQuotaUsage(Quota.Counts counts) {
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache) {
     if (children != null) {
       for (INode child : children) {
-        child.computeQuotaUsage(counts);
+        child.computeQuotaUsage(counts, useCache);
       }
     }
+
+    return computeQuotaUsage4CurrentDirectory(counts);    
+  }
+
+  /** Add quota usage for this inode excluding children. */
+  public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
     counts.add(Quota.NAMESPACE, 1);
     return counts;    
   }
@@ -582,15 +579,15 @@ public class INodeDirectory extends INod
         : ReadOnlyList.Util.asReadOnlyList(children);
   }
 
-  /** Set the children list. */
-  public void setChildren(List<INode> children) {
-    this.children = children;
+  /** Set the children list to null. */
+  public void clearChildren() {
+    this.children = null;
   }
 
   @Override
   public void clearReferences() {
     super.clearReferences();
-    setChildren(null);
+    clearChildren();
   }
 
   /**
@@ -598,7 +595,8 @@ public class INodeDirectory extends INod
    * recursively down the subtree.
    */
   public int cleanSubtreeRecursively(final Snapshot snapshot, Snapshot prior,
-      final BlocksMapUpdateInfo collectedBlocks) {
+      final BlocksMapUpdateInfo collectedBlocks)
+          throws NSQuotaExceededException {
     int total = 0;
     // in case of deletion snapshot, since this call happens after we modify
     // the diff list, the snapshot to be deleted has been combined or renamed
@@ -624,7 +622,8 @@ public class INodeDirectory extends INod
   
   @Override
   public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
-      final BlocksMapUpdateInfo collectedBlocks) {
+      final BlocksMapUpdateInfo collectedBlocks)
+          throws NSQuotaExceededException {
     int total = 0;
     if (prior == null && snapshot == null) {
       // destroy the whole subtree and collect blocks that should be deleted
@@ -870,7 +869,7 @@ public class INodeDirectory extends INod
     super.dumpTreeRecursively(out, prefix, snapshot);
     out.print(", childrenSize=" + getChildrenList(snapshot).size());
     if (this instanceof INodeDirectoryWithQuota) {
-//      out.print(((INodeDirectoryWithQuota)this).quotaString());
+      out.print(((INodeDirectoryWithQuota)this).quotaString());
     }
     out.println();
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Thu Feb 28 03:08:08 2013
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 /**
  * Directory INode class that has a quota restriction
@@ -87,18 +86,21 @@ public class INodeDirectoryWithQuota ext
    * @param nsQuota Namespace quota to be set
    * @param dsQuota diskspace quota to be set
    */
-  public void setQuota(long nsQuota, long dsQuota, Snapshot latest) {
-    final INodeDirectoryWithQuota nodeToUpdate
-        = (INodeDirectoryWithQuota)recordModification(latest);
-    nodeToUpdate.nsQuota = nsQuota;
-    nodeToUpdate.dsQuota = dsQuota;
+  public void setQuota(long nsQuota, long dsQuota) {
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
   @Override
-  public final Quota.Counts computeQuotaUsage(Quota.Counts counts) {
-    // use cache value
-    counts.add(Quota.NAMESPACE, namespace);
-    counts.add(Quota.DISKSPACE, diskspace);
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache) {
+    if (useCache) {
+      // use cache value
+      counts.add(Quota.NAMESPACE, namespace);
+      counts.add(Quota.DISKSPACE, diskspace);
+    } else {
+      super.computeQuotaUsage(counts, false);
+    }
     return counts;
   }
 
@@ -121,9 +123,9 @@ public class INodeDirectoryWithQuota ext
   }
   
   private void checkDiskspace(final long computed) {
-    if (-1 != getDsQuota() && diskspaceConsumed() != computed) {
+    if (-1 != getDsQuota() && diskspace != computed) {
       NameNode.LOG.error("BUG: Inconsistent diskspace for directory "
-          + getFullPathName() + ". Cached = " + diskspaceConsumed()
+          + getFullPathName() + ". Cached = " + diskspace
           + " != Computed = " + computed);
     }
   }
@@ -135,10 +137,25 @@ public class INodeDirectoryWithQuota ext
     return namespace;
   }
   
-  long diskspaceConsumed() {
-    return diskspace;
+  @Override
+  public final void addNamespaceConsumed(final int delta)
+      throws NSQuotaExceededException {
+    if (isQuotaSet()) { 
+      // The following steps are important: 
+      // check quotas in this inode and all ancestors before changing counts
+      // so that no change is made if there is any quota violation.
+
+      // (1) verify quota in this inode  
+      verifyNamespaceQuota(delta);
+      // (2) verify quota and then add count in ancestors 
+      super.addNamespaceConsumed(delta);
+      // (3) add count in this inode
+      namespace += delta;
+    } else {
+      super.addNamespaceConsumed(delta);
+    }
   }
-  
+
   /** Update the size of the tree
    * 
    * @param nsDelta the change of the tree size
@@ -162,13 +179,19 @@ public class INodeDirectoryWithQuota ext
     this.diskspace = diskspace;
   }
   
+  /** Verify if the namespace quota is violated after applying delta. */
+  void verifyNamespaceQuota(long delta) throws NSQuotaExceededException {
+    if (Quota.isViolated(nsQuota, namespace, delta)) {
+      throw new NSQuotaExceededException(nsQuota, namespace + delta);
+    }
+  }
+
   /** Verify if the namespace count disk space satisfies the quota restriction 
    * @throws QuotaExceededException if the given quota is less than the count
    */
   void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
-    if (Quota.isViolated(nsQuota, namespace, nsDelta)) {
-      throw new NSQuotaExceededException(nsQuota, namespace + nsDelta);
-    }
+    verifyNamespaceQuota(nsDelta);
+
     if (Quota.isViolated(dsQuota, diskspace, dsDelta)) {
       throw new DSQuotaExceededException(dsQuota, diskspace + dsDelta);
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu Feb 28 03:08:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@@ -132,7 +133,8 @@ public class INodeFile extends INode imp
   }
 
   @Override
-  public INodeFile recordModification(final Snapshot latest) {
+  public INodeFile recordModification(final Snapshot latest)
+      throws NSQuotaExceededException {
     return isInLatestSnapshot(latest)?
         parent.replaceChild4INodeFileWithSnapshot(this)
             .recordModification(latest)
@@ -145,7 +147,18 @@ public class INodeFile extends INode imp
    * the {@link FsAction#EXECUTE} action, if any, is ignored.
    */
   @Override
-  final INode setPermission(FsPermission permission, Snapshot latest) {
+  final void setPermission(FsPermission permission) {
+    super.setPermission(permission.applyUMask(UMASK));
+  }
+
+  /**
+   * Set the {@link FsPermission} of this {@link INodeFile}.
+   * Since this is a file,
+   * the {@link FsAction#EXECUTE} action, if any, is ignored.
+   */
+  @Override
+  final INode setPermission(FsPermission permission, Snapshot latest)
+      throws NSQuotaExceededException {
     return super.setPermission(permission.applyUMask(UMASK), latest);
   }
 
@@ -170,15 +183,19 @@ public class INodeFile extends INode imp
         : getFileReplication(null);
   }
 
-  public void setFileReplication(short replication, Snapshot latest) {
-    final INodeFile nodeToUpdate = recordModification(latest);
-    if (nodeToUpdate != this) {
-      nodeToUpdate.setFileReplication(replication, null);
-      return;
-    }
+  /** Set the replication factor of this file. */
+  public final void setFileReplication(short replication) {
     header = HeaderFormat.combineReplication(header, replication);
   }
 
+  /** Set the replication factor of this file. */
+  public final INodeFile setFileReplication(short replication, Snapshot latest)
+      throws NSQuotaExceededException {
+    final INodeFile nodeToUpdate = recordModification(latest);
+    nodeToUpdate.setFileReplication(replication);
+    return nodeToUpdate;
+  }
+
   /** @return preferred block size (in bytes) of the file. */
   @Override
   public long getPreferredBlockSize() {
@@ -253,7 +270,8 @@ public class INodeFile extends INode imp
 
   @Override
   public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
-      final BlocksMapUpdateInfo collectedBlocks) {
+      final BlocksMapUpdateInfo collectedBlocks)
+          throws NSQuotaExceededException {
     if (snapshot == null && prior == null) {   
       // this only happens when deleting the current file
       return destroyAndCollectBlocks(collectedBlocks);
@@ -282,7 +300,8 @@ public class INodeFile extends INode imp
   }
 
   @Override
-  Quota.Counts computeQuotaUsage(Quota.Counts counts) {
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache) {
     counts.add(Quota.NAMESPACE, this instanceof FileWithSnapshot?
         ((FileWithSnapshot)this).getDiffs().asList().size() + 1: 1);
     counts.add(Quota.DISKSPACE, diskspaceConsumed());
@@ -397,7 +416,7 @@ public class INodeFile extends INode imp
     return size;
   }
 
-  long diskspaceConsumed() {
+  final long diskspaceConsumed() {
     // use preferred block size for the last block if it is under construction
     return computeFileSize(true, true) * getBlockReplication();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu Feb 28 03:08:08 2013
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -131,7 +132,8 @@ public class INodeFileUnderConstruction 
   }
   
   @Override
-  public INodeFileUnderConstruction recordModification(final Snapshot latest) {
+  public INodeFileUnderConstruction recordModification(final Snapshot latest)
+      throws NSQuotaExceededException {
     return isInLatestSnapshot(latest)?
         parent.replaceChild4INodeFileUcWithSnapshot(this)
             .recordModification(latest)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Thu Feb 28 03:08:08 2013
@@ -22,6 +22,7 @@ import java.io.PrintWriter;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
@@ -44,7 +45,7 @@ public class INodeSymlink extends INode 
   }
 
   @Override
-  INode recordModification(Snapshot latest) {
+  INode recordModification(Snapshot latest) throws NSQuotaExceededException {
     return isInLatestSnapshot(latest)?
         parent.saveChild2Snapshot(this, latest, new INodeSymlink(this))
         : this;
@@ -77,7 +78,8 @@ public class INodeSymlink extends INode 
   }
 
   @Override
-  Quota.Counts computeQuotaUsage(final Quota.Counts counts) {
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean updateCache) {
     counts.add(Quota.NAMESPACE, 1);
     return counts;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java Thu Feb 28 03:08:08 2013
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
@@ -48,9 +49,11 @@ abstract class AbstractINodeDiffList<N e
     return Collections.unmodifiableList(diffs);
   }
   
-  /** clear the diff list */
-  void clear() {
+  /** clear the diff list,  */
+  int clear() {
+    final int n = diffs.size();
     diffs.clear();
+    return n;
   }
 
   /**
@@ -102,7 +105,9 @@ abstract class AbstractINodeDiffList<N e
   }
 
   /** Add an {@link AbstractINodeDiff} for the given snapshot. */
-  final D addDiff(Snapshot latest, N currentINode) {
+  final D addDiff(Snapshot latest, N currentINode)
+      throws NSQuotaExceededException {
+    currentINode.addNamespaceConsumed(1);
     return addLast(factory.createDiff(latest, currentINode));
   }
 
@@ -220,14 +225,25 @@ abstract class AbstractINodeDiffList<N e
    * Check if the latest snapshot diff exists.  If not, add it.
    * @return the latest snapshot diff, which is never null.
    */
-  final D checkAndAddLatestSnapshotDiff(Snapshot latest, N currentINode) {
+  final D checkAndAddLatestSnapshotDiff(Snapshot latest, N currentINode)
+      throws NSQuotaExceededException {
     final D last = getLast();
-    return last != null && last.snapshot.equals(latest)? last
-        : addDiff(latest, currentINode);
+    if (last != null
+        && Snapshot.ID_COMPARATOR.compare(last.getSnapshot(), latest) >= 0) {
+      return last;
+    } else {
+      try {
+        return addDiff(latest, currentINode);
+      } catch(NSQuotaExceededException e) {
+        e.setMessagePrefix("Failed to record modification for snapshot");
+        throw e;
+      }
+    }
   }
 
   /** Save the snapshot copy to the latest snapshot. */
-  public void saveSelf2Snapshot(Snapshot latest, N currentINode, N snapshotCopy) {
+  public void saveSelf2Snapshot(Snapshot latest, N currentINode, N snapshotCopy)
+      throws NSQuotaExceededException {
     if (latest != null) {
       checkAndAddLatestSnapshotDiff(latest, currentINode).saveSnapshotCopy(
           snapshotCopy, factory, currentINode);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Thu Feb 28 03:08:08 2013
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
@@ -270,7 +271,8 @@ public class INodeDirectorySnapshottable
   }
 
   /** Add a snapshot. */
-  Snapshot addSnapshot(int id, String name) throws SnapshotException {
+  Snapshot addSnapshot(int id, String name)
+      throws SnapshotException, NSQuotaExceededException {
     //check snapshot quota
     final int n = getNumSnapshots();
     if (n + 1 > snapshotQuota) {
@@ -315,7 +317,11 @@ public class INodeDirectorySnapshottable
     } else {
       final Snapshot snapshot = snapshotsByNames.remove(i);
       Snapshot prior = Snapshot.findLatestSnapshot(this, snapshot);
-      cleanSubtree(snapshot, prior, collectedBlocks);
+      try {
+        cleanSubtree(snapshot, prior, collectedBlocks);
+      } catch(NSQuotaExceededException e) {
+        LOG.error("BUG: removeSnapshot increases namespace usage.", e);
+      }
       return snapshot;
     }
   }
@@ -427,7 +433,7 @@ public class INodeDirectorySnapshottable
    * Replace itself with {@link INodeDirectoryWithSnapshot} or
    * {@link INodeDirectory} depending on the latest snapshot.
    */
-  void replaceSelf(final Snapshot latest) {
+  void replaceSelf(final Snapshot latest) throws NSQuotaExceededException {
     if (latest == null) {
       Preconditions.checkState(getLastSnapshot() == null,
           "latest == null but getLastSnapshot() != null, this=%s", this);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Thu Feb 28 03:08:08 2013
@@ -25,10 +25,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.Quota;
 import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
@@ -79,7 +81,7 @@ public class INodeDirectoryWithSnapshot 
       for (INode c : createdList) {
         removedNum += c.destroyAndCollectBlocks(collectedBlocks);
         // if c is also contained in the children list, remove it
-        currentINode.removeChild(c, null);
+        currentINode.removeChild(c);
       }
       createdList.clear();
       return removedNum;
@@ -353,7 +355,7 @@ public class INodeDirectoryWithSnapshot 
           new INodeDirectoryWithQuota(currentDir, false,
               currentDir.getNsQuota(), currentDir.getDsQuota())
         : new INodeDirectory(currentDir, false);
-      copy.setChildren(null);
+      copy.clearChildren();
       return copy;
     }
   }
@@ -462,21 +464,23 @@ public class INodeDirectoryWithSnapshot 
   }
 
   @Override
-  public INodeDirectoryWithSnapshot recordModification(final Snapshot latest) {
+  public INodeDirectoryWithSnapshot recordModification(final Snapshot latest)
+      throws NSQuotaExceededException {
     return isInLatestSnapshot(latest)?
         saveSelf2Snapshot(latest, null): this;
   }
 
   /** Save the snapshot copy to the latest snapshot. */
   public INodeDirectoryWithSnapshot saveSelf2Snapshot(
-      final Snapshot latest, final INodeDirectory snapshotCopy) {
+      final Snapshot latest, final INodeDirectory snapshotCopy)
+          throws NSQuotaExceededException {
     diffs.saveSelf2Snapshot(latest, this, snapshotCopy);
     return this;
   }
 
   @Override
   public INode saveChild2Snapshot(final INode child, final Snapshot latest,
-      final INode snapshotCopy) {
+      final INode snapshotCopy) throws NSQuotaExceededException {
     Preconditions.checkArgument(!child.isDirectory(),
         "child is a directory, child=%s", child);
     if (latest == null) {
@@ -494,7 +498,8 @@ public class INodeDirectoryWithSnapshot 
   }
 
   @Override
-  public boolean addChild(INode inode, boolean setModTime, Snapshot latest) {
+  public boolean addChild(INode inode, boolean setModTime, Snapshot latest)
+      throws NSQuotaExceededException {
     ChildrenDiff diff = null;
     Integer undoInfo = null;
     if (latest != null) {
@@ -509,7 +514,8 @@ public class INodeDirectoryWithSnapshot 
   }
 
   @Override
-  public boolean removeChild(INode child, Snapshot latest) {
+  public boolean removeChild(INode child, Snapshot latest)
+      throws NSQuotaExceededException {
     ChildrenDiff diff = null;
     UndoInfo<INode> undoInfo = null;
     if (latest != null) {
@@ -605,7 +611,8 @@ public class INodeDirectoryWithSnapshot 
 
   @Override
   public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
-      final BlocksMapUpdateInfo collectedBlocks) {
+      final BlocksMapUpdateInfo collectedBlocks)
+          throws NSQuotaExceededException {
     int n = 0;
     if (snapshot == null) { // delete the current directory
       recordModification(prior);
@@ -637,12 +644,25 @@ public class INodeDirectoryWithSnapshot 
     for (DirectoryDiff diff : diffs) {
       total += diff.destroyAndCollectBlocks(this, collectedBlocks);
     }
-    diffs.clear();
+    total += diffs.clear();
     total += super.destroyAndCollectBlocks(collectedBlocks);
     return total;
   }
 
   @Override
+  public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
+    super.computeQuotaUsage4CurrentDirectory(counts);
+
+    for(DirectoryDiff d : diffs) {
+      for(INode deleted : d.getChildrenDiff().getDeletedList()) {
+        deleted.computeQuotaUsage(counts, false);
+      }
+    }
+    counts.add(Quota.NAMESPACE, diffs.asList().size());
+    return counts;
+  }
+
+  @Override
   public Content.CountsMap computeContentSummary(
       final Content.CountsMap countsMap) {
     super.computeContentSummary(countsMap);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java Thu Feb 28 03:08:08 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
@@ -75,8 +76,8 @@ public class INodeFileUnderConstructionW
     assertAllBlocksComplete();
     final long atime = getModificationTime();
     final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this, getDiffs());
-    f.setModificationTime(mtime, null);
-    f.setAccessTime(atime, null);
+    f.setModificationTime(mtime);
+    f.setAccessTime(atime);
     return f;
   }
 
@@ -92,7 +93,7 @@ public class INodeFileUnderConstructionW
 
   @Override
   public INodeFileUnderConstructionWithSnapshot recordModification(
-      final Snapshot latest) {
+      final Snapshot latest) throws NSQuotaExceededException {
     if (isInLatestSnapshot(latest)) {
       diffs.saveSelf2Snapshot(latest, this, null);
     }
@@ -111,7 +112,8 @@ public class INodeFileUnderConstructionW
 
   @Override
   public int cleanSubtree(final Snapshot snapshot, Snapshot prior,
-      final BlocksMapUpdateInfo collectedBlocks) {
+      final BlocksMapUpdateInfo collectedBlocks)
+          throws NSQuotaExceededException {
     if (snapshot == null) { // delete the current file
       recordModification(prior);
       isCurrentFileDeleted = true;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java Thu Feb 28 03:08:08 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 
@@ -64,7 +65,8 @@ public class INodeFileWithSnapshot exten
   }
 
   @Override
-  public INodeFileWithSnapshot recordModification(final Snapshot latest) {
+  public INodeFileWithSnapshot recordModification(final Snapshot latest)
+      throws NSQuotaExceededException {
     if (isInLatestSnapshot(latest)) {
       diffs.saveSelf2Snapshot(latest, this, null);
     }
@@ -83,7 +85,8 @@ public class INodeFileWithSnapshot exten
 
   @Override
   public int cleanSubtree(final Snapshot snapshot, Snapshot prior, 
-      final BlocksMapUpdateInfo collectedBlocks) {
+      final BlocksMapUpdateInfo collectedBlocks)
+          throws NSQuotaExceededException {
     if (snapshot == null) { // delete the current file
       recordModification(prior);
       isCurrentFileDeleted = true;
@@ -98,8 +101,8 @@ public class INodeFileWithSnapshot exten
   public int destroyAndCollectBlocks(
       final BlocksMapUpdateInfo collectedBlocks) {
     Preconditions.checkState(this.isCurrentFileDeleted);
-    diffs.clear();
-    return super.destroyAndCollectBlocks(collectedBlocks);
+    final int n = diffs.clear();
+    return n + super.destroyAndCollectBlocks(collectedBlocks);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Thu Feb 28 03:08:08 2013
@@ -129,6 +129,8 @@ public class SnapshotManager implements 
     final INodesInPath i = fsdir.getINodesInPath4Write(path);
     final INodeDirectorySnapshottable srcRoot
         = INodeDirectorySnapshottable.valueOf(i.getLastINode(), path);
+
+    fsdir.verifyMaxComponentLength(snapshotName, path, 0);
     srcRoot.addSnapshot(snapshotCounter, snapshotName);
       
     //create success, update id

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java Thu Feb 28 03:08:08 2013
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -59,7 +57,6 @@ public class TestFSImageWithSnapshot {
   static final int BLOCKSIZE = 1024;
   static final long txid = 1;
 
-  private final Path rootDir = new Path("/");
   private final Path dir = new Path("/TestSnapshot");
   private static String testDir =
       System.getProperty("test.build.data", "build/test/data");
@@ -114,10 +111,7 @@ public class TestFSImageWithSnapshot {
    */
   private File dumpTree2File(String fileSuffix) throws IOException {
     File file = getDumpTreeFile(testDir, fileSuffix);
-    PrintWriter out = new PrintWriter(new FileWriter(file, false), true);
-    fsn.getFSDirectory().getINode(rootDir.toString())
-        .dumpTreeRecursively(out, new StringBuilder(), null);
-    out.close();
+    SnapshotTestHelper.dumpTree2File(fsn.getFSDirectory(), file);
     return file;
   }
   
@@ -155,6 +149,8 @@ public class TestFSImageWithSnapshot {
     fsn.getFSDirectory().writeLock();
     try {
       loader.load(imageFile);
+      FSImage.updateCountForQuota(
+          (INodeDirectoryWithQuota)fsn.getFSDirectory().getINode("/"));
     } finally {
       fsn.getFSDirectory().writeUnlock();
       fsn.writeUnlock();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java Thu Feb 28 03:08:08 2013
@@ -160,7 +160,7 @@ public class TestFsLimits {
     Class<?> generated = null;
     try {
       fs.verifyFsLimits(inodes, 1, child);
-      rootInode.addChild(child, false, null);
+      rootInode.addChild(child);
     } catch (QuotaExceededException e) {
       generated = e.getClass();
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java Thu Feb 28 03:08:08 2013
@@ -149,11 +149,11 @@ public class TestINodeFile {
     assertEquals("f", inf.getFullPathName());
     assertEquals("", inf.getLocalParentDir());
 
-    dir.addChild(inf, false, null);
+    dir.addChild(inf);
     assertEquals("d"+Path.SEPARATOR+"f", inf.getFullPathName());
     assertEquals("d", inf.getLocalParentDir());
     
-    root.addChild(dir, false, null);
+    root.addChild(dir);
     assertEquals(Path.SEPARATOR+"d"+Path.SEPARATOR+"f", inf.getFullPathName());
     assertEquals(Path.SEPARATOR+"d", dir.getFullPathName());
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java Thu Feb 28 03:08:08 2013
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -128,6 +130,8 @@ public class SnapshotTestHelper {
     assertTrue(hdfs.exists(snapshotRoot));
     hdfs.allowSnapshot(snapshotRoot.toString());
     hdfs.createSnapshot(snapshotRoot, snapshotName);
+    // set quota to a large value for testing counts
+    hdfs.setQuota(snapshotRoot, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
     return SnapshotTestHelper.getSnapshotRoot(snapshotRoot, snapshotName);
   }
 
@@ -192,6 +196,11 @@ public class SnapshotTestHelper {
   }
   private static void compareDumpedTreeInFile(File file1, File file2,
       boolean print) throws IOException {
+    if (print) {
+      printFile(file1);
+      printFile(file2);
+    }
+
     BufferedReader reader1 = new BufferedReader(new FileReader(file1));
     BufferedReader reader2 = new BufferedReader(new FileReader(file2));
     try {
@@ -238,6 +247,25 @@ public class SnapshotTestHelper {
     }
   }
 
+  static void printFile(File f) throws IOException {
+    System.out.println();
+    System.out.println("File: " + f);
+    BufferedReader in = new BufferedReader(new FileReader(f));
+    try {
+      for(String line; (line = in.readLine()) != null; ) {
+        System.out.println(line);
+      }
+    } finally {
+      in.close();
+    }
+  }
+
+  public static void dumpTree2File(FSDirectory fsdir, File f) throws IOException{
+    final PrintWriter out = new PrintWriter(new FileWriter(f, false), true);
+    fsdir.getINode("/").dumpTreeRecursively(out, new StringBuilder(), null);
+    out.close();
+  }
+
   /**
    * Generate the path for a snapshot file.
    * 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java?rev=1451081&r1=1451080&r2=1451081&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java Thu Feb 28 03:08:08 2013
@@ -31,8 +31,11 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.ipc.RemoteException;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -165,6 +168,65 @@ public class TestNestedSnapshots {
     }
   }
 
+  @Test
+  public void testSnapshotWithQuota() throws Exception {
+    final String dirStr = "/testSnapshotWithQuota/dir";
+    final Path dir = new Path(dirStr);
+    hdfs.mkdirs(dir, new FsPermission((short)0777));
+    hdfs.allowSnapshot(dirStr);
+
+    // set namespace quota
+    final int NS_QUOTA = 6;
+    hdfs.setQuota(dir, NS_QUOTA, HdfsConstants.QUOTA_DONT_SET);
+
+    // create object to use up the quota.
+    final Path foo = new Path(dir, "foo");
+    final Path f1 = new Path(foo, "f1");
+    DFSTestUtil.createFile(hdfs, f1, BLOCKSIZE, REPLICATION, SEED);
+    hdfs.createSnapshot(dir, "s0");
+    final Path f2 = new Path(foo, "f2");
+    DFSTestUtil.createFile(hdfs, f2, BLOCKSIZE, REPLICATION, SEED);
+    
+    try {
+      // normal create file should fail with quota
+      final Path f3 = new Path(foo, "f3");
+      DFSTestUtil.createFile(hdfs, f3, BLOCKSIZE, REPLICATION, SEED);
+      Assert.fail();
+    } catch(NSQuotaExceededException e) {
+      SnapshotTestHelper.LOG.info("The exception is expected.", e);
+    }
+
+    try {
+      // createSnapshot should fail with quota
+      hdfs.createSnapshot(dir, "s1");
+      Assert.fail();
+    } catch(RemoteException re) {
+      final IOException ioe = re.unwrapRemoteException();
+      if (ioe instanceof NSQuotaExceededException) {
+        SnapshotTestHelper.LOG.info("The exception is expected.", ioe);
+      }
+    }
+
+    try {
+      // setPermission f1 should fail with quote since it cannot add diff.
+      hdfs.setPermission(f1, new FsPermission((short)0));
+      Assert.fail();
+    } catch(RemoteException re) {
+      final IOException ioe = re.unwrapRemoteException();
+      if (ioe instanceof NSQuotaExceededException) {
+        SnapshotTestHelper.LOG.info("The exception is expected.", ioe);
+      }
+    }
+
+    // setPermission f2 since it was created after the snapshot
+    hdfs.setPermission(f2, new FsPermission((short)0));
+
+    // increase quota and retry the commands.
+    hdfs.setQuota(dir, NS_QUOTA + 2, HdfsConstants.QUOTA_DONT_SET);
+    hdfs.createSnapshot(dir, "s1");
+    hdfs.setPermission(foo, new FsPermission((short)0444));
+  }
+
   /**
    * Test {@link Snapshot#ID_COMPARATOR}.
    */



Mime
View raw message