hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1480838 [6/7] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/...
Date Thu, 09 May 2013 23:55:52 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Thu May  9 23:55:49 2013
@@ -21,20 +21,30 @@ import java.io.FileNotFoundException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Directory INode class.
  */
-class INodeDirectory extends INode {
+public class INodeDirectory extends INodeWithAdditionalFields {
   /** Cast INode to INodeDirectory. */
   public static INodeDirectory valueOf(INode inode, Object path
       ) throws FileNotFoundException, PathIsNotDirectoryException {
@@ -45,219 +55,318 @@ class INodeDirectory extends INode {
     if (!inode.isDirectory()) {
       throw new PathIsNotDirectoryException(DFSUtil.path2String(path));
     }
-    return (INodeDirectory)inode; 
+    return inode.asDirectory(); 
   }
 
   protected static final int DEFAULT_FILES_PER_DIRECTORY = 5;
-  final static String ROOT_NAME = "";
+  final static byte[] ROOT_NAME = DFSUtil.string2Bytes("");
 
   private List<INode> children = null;
 
-  INodeDirectory(long id, String name, PermissionStatus permissions) {
-    super(id, name, permissions);
-  }
-
-  public INodeDirectory(long id, PermissionStatus permissions, long mTime) {
-    super(id, permissions, mTime, 0);
-  }
-  
   /** constructor */
-  INodeDirectory(long id, byte[] name, PermissionStatus permissions, long mtime) {
-    super(id, name, permissions, null, mtime, 0L);
+  public INodeDirectory(long id, byte[] name, PermissionStatus permissions,
+      long mtime) {
+    super(id, name, permissions, mtime, 0L);
   }
   
-  /** copy constructor
-   * 
-   * @param other
+  /**
+   * Copy constructor
+   * @param other The INodeDirectory to be copied
+   * @param adopt Indicate whether or not need to set the parent field of child
+   *              INodes to the new node
    */
-  INodeDirectory(INodeDirectory other) {
+  public INodeDirectory(INodeDirectory other, boolean adopt) {
     super(other);
     this.children = other.children;
-    if (this.children != null) {
+    if (adopt && this.children != null) {
       for (INode child : children) {
-        child.parent = this;
+        child.setParent(this);
       }
     }
   }
-  
+
   /** @return true unconditionally. */
   @Override
   public final boolean isDirectory() {
     return true;
   }
 
-  private void assertChildrenNonNull() {
-    if (children == null) {
-      throw new AssertionError("children is null: " + this);
-    }
+  /** @return this object. */
+  @Override
+  public final INodeDirectory asDirectory() {
+    return this;
+  }
+
+  /** Is this a snapshottable directory? */
+  public boolean isSnapshottable() {
+    return false;
   }
 
-  private int searchChildren(INode inode) {
-    return Collections.binarySearch(children, inode.getLocalNameBytes());
+  private int searchChildren(byte[] name) {
+    return children == null? -1: Collections.binarySearch(children, name);
   }
 
-  INode removeChild(INode node) {
-    assertChildrenNonNull();
-    final int i = searchChildren(node);
-    return i >= 0? children.remove(i): null;
+  /**
+   * Remove the specified child from this directory.
+   * 
+   * @param child the child inode to be removed
+   * @param latest See {@link INode#recordModification(Snapshot, INodeMap)}.
+   */
+  public boolean removeChild(INode child, Snapshot latest,
+      final INodeMap inodeMap) throws QuotaExceededException {
+    if (isInLatestSnapshot(latest)) {
+      return replaceSelf4INodeDirectoryWithSnapshot(inodeMap)
+          .removeChild(child, latest, inodeMap);
+    }
+
+    return removeChild(child);
   }
 
-  /** Replace a child that has the same name as newChild by newChild.
+  /** 
+   * Remove the specified child from this directory.
+   * The basic remove method which actually calls children.remove(..).
+   *
+   * @param child the child inode to be removed
    * 
-   * @param newChild Child node to be added
+   * @return true if the child is removed; false if the child is not found.
    */
-  void replaceChild(INode newChild) {
-    assertChildrenNonNull();
+  protected final boolean removeChild(final INode child) {
+    final int i = searchChildren(child.getLocalNameBytes());
+    if (i < 0) {
+      return false;
+    }
 
-    final int low = searchChildren(newChild);
-    if (low>=0) { // an old child exists so replace by the newChild
-      children.get(low).parent = null;
-      children.set(low, newChild);
+    final INode removed = children.remove(i);
+    Preconditions.checkState(removed == child);
+    return true;
+  }
+
+  /**
+   * Replace itself with {@link INodeDirectoryWithQuota} or
+   * {@link INodeDirectoryWithSnapshot} depending on the latest snapshot.
+   */
+  INodeDirectoryWithQuota replaceSelf4Quota(final Snapshot latest,
+      final long nsQuota, final long dsQuota, final INodeMap inodeMap)
+      throws QuotaExceededException {
+    Preconditions.checkState(!(this instanceof INodeDirectoryWithQuota),
+        "this is already an INodeDirectoryWithQuota, this=%s", this);
+
+    if (!this.isInLatestSnapshot(latest)) {
+      final INodeDirectoryWithQuota q = new INodeDirectoryWithQuota(
+          this, true, nsQuota, dsQuota);
+      replaceSelf(q, inodeMap);
+      return q;
     } else {
-      throw new IllegalArgumentException("No child exists to be replaced");
+      final INodeDirectoryWithSnapshot s = new INodeDirectoryWithSnapshot(this);
+      s.setQuota(nsQuota, dsQuota);
+      return replaceSelf(s, inodeMap).saveSelf2Snapshot(latest, this);
+    }
+  }
+  /** Replace itself with an {@link INodeDirectorySnapshottable}. */
+  public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
+      Snapshot latest, final INodeMap inodeMap) throws QuotaExceededException {
+    Preconditions.checkState(!(this instanceof INodeDirectorySnapshottable),
+        "this is already an INodeDirectorySnapshottable, this=%s", this);
+    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
+    replaceSelf(s, inodeMap).saveSelf2Snapshot(latest, this);
+    return s;
+  }
+
+  /** Replace itself with an {@link INodeDirectoryWithSnapshot}. */
+  public INodeDirectoryWithSnapshot replaceSelf4INodeDirectoryWithSnapshot(
+      final INodeMap inodeMap) {
+    return replaceSelf(new INodeDirectoryWithSnapshot(this), inodeMap);
+  }
+
+  /** Replace itself with {@link INodeDirectory}. */
+  public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
+    Preconditions.checkState(getClass() != INodeDirectory.class,
+        "the class is already INodeDirectory, this=%s", this);
+    return replaceSelf(new INodeDirectory(this, true), inodeMap);
+  }
+
+  /** Replace itself with the given directory. */
+  private final <N extends INodeDirectory> N replaceSelf(final N newDir,
+      final INodeMap inodeMap) {
+    final INodeReference ref = getParentReference();
+    if (ref != null) {
+      ref.setReferredINode(newDir);
+      if (inodeMap != null) {
+        inodeMap.put(newDir);
+      }
+    } else {
+      final INodeDirectory parent = getParent();
+      Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
+      parent.replaceChild(this, newDir, inodeMap);
+    }
+    clear();
+    return newDir;
+  }
+
+  /** Replace the given child with a new child. */
+  public void replaceChild(INode oldChild, final INode newChild,
+      final INodeMap inodeMap) {
+    Preconditions.checkNotNull(children);
+    final int i = searchChildren(newChild.getLocalNameBytes());
+    Preconditions.checkState(i >= 0);
+    Preconditions.checkState(oldChild == children.get(i)
+        || oldChild == children.get(i).asReference().getReferredINode()
+            .asReference().getReferredINode());
+    oldChild = children.get(i);
+    
+    if (oldChild.isReference() && !newChild.isReference()) {
+      // replace the referred inode, e.g., 
+      // INodeFileWithSnapshot -> INodeFileUnderConstructionWithSnapshot
+      final INode withCount = oldChild.asReference().getReferredINode();
+      withCount.asReference().setReferredINode(newChild);
+    } else {
+      if (oldChild.isReference()) {
+        // both are reference nodes, e.g., DstReference -> WithName
+        final INodeReference.WithCount withCount = 
+            (WithCount) oldChild.asReference().getReferredINode();
+        withCount.removeReference(oldChild.asReference());
+      }
+      children.set(i, newChild);
+    }
+    // update the inodeMap
+    if (inodeMap != null) {
+      inodeMap.put(newChild);
     }
   }
+
+  INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild,
+      Snapshot latest) {
+    Preconditions.checkArgument(latest != null);
+    if (oldChild instanceof INodeReference.WithName) {
+      return (INodeReference.WithName)oldChild;
+    }
+
+    final INodeReference.WithCount withCount;
+    if (oldChild.isReference()) {
+      Preconditions.checkState(oldChild instanceof INodeReference.DstReference);
+      withCount = (INodeReference.WithCount) oldChild.asReference()
+          .getReferredINode();
+    } else {
+      withCount = new INodeReference.WithCount(null, oldChild);
+    }
+    final INodeReference.WithName ref = new INodeReference.WithName(this,
+        withCount, oldChild.getLocalNameBytes(), latest.getId());
+    replaceChild(oldChild, ref, null);
+    return ref;
+  }
   
-  INode getChild(String name) {
-    return getChildINode(DFSUtil.string2Bytes(name));
+  private void replaceChildFile(final INodeFile oldChild,
+      final INodeFile newChild, final INodeMap inodeMap) {
+    replaceChild(oldChild, newChild, inodeMap);
+    oldChild.clear();
+    newChild.updateBlockCollection();
+  }
+
+  /** Replace a child {@link INodeFile} with an {@link INodeFileWithSnapshot}. */
+  INodeFileWithSnapshot replaceChild4INodeFileWithSnapshot(
+      final INodeFile child, final INodeMap inodeMap) {
+    Preconditions.checkArgument(!(child instanceof INodeFileWithSnapshot),
+        "Child file is already an INodeFileWithSnapshot, child=" + child);
+    final INodeFileWithSnapshot newChild = new INodeFileWithSnapshot(child);
+    replaceChildFile(child, newChild, inodeMap);
+    return newChild;
+  }
+
+  /** Replace a child {@link INodeFile} with an {@link INodeFileUnderConstructionWithSnapshot}. */
+  INodeFileUnderConstructionWithSnapshot replaceChild4INodeFileUcWithSnapshot(
+      final INodeFileUnderConstruction child, final INodeMap inodeMap) {
+    Preconditions.checkArgument(!(child instanceof INodeFileUnderConstructionWithSnapshot),
+        "Child file is already an INodeFileUnderConstructionWithSnapshot, child=" + child);
+    final INodeFileUnderConstructionWithSnapshot newChild
+        = new INodeFileUnderConstructionWithSnapshot(child, null);
+    replaceChildFile(child, newChild, inodeMap);
+    return newChild;
   }
 
-  private INode getChildINode(byte[] name) {
-    if (children == null) {
-      return null;
-    }
-    int low = Collections.binarySearch(children, name);
-    if (low >= 0) {
-      return children.get(low);
+  @Override
+  public INodeDirectory recordModification(Snapshot latest,
+      final INodeMap inodeMap) throws QuotaExceededException {
+    if (isInLatestSnapshot(latest)) {
+      return replaceSelf4INodeDirectoryWithSnapshot(inodeMap)
+          .recordModification(latest, inodeMap);
+    } else {
+      return this;
     }
-    return null;
   }
 
   /**
-   * @return the INode of the last component in components, or null if the last
-   * component does not exist.
+   * Save the child to the latest snapshot.
+   * 
+   * @return the child inode, which may be replaced.
    */
-  private INode getNode(byte[][] components, boolean resolveLink
-      ) throws UnresolvedLinkException {
-    INodesInPath inodesInPath = getExistingPathINodes(components, 1,
-        resolveLink);
-    return inodesInPath.inodes[0];
+  public INode saveChild2Snapshot(final INode child, final Snapshot latest,
+      final INode snapshotCopy, final INodeMap inodeMap)
+      throws QuotaExceededException {
+    if (latest == null) {
+      return child;
+    }
+    return replaceSelf4INodeDirectoryWithSnapshot(inodeMap)
+        .saveChild2Snapshot(child, latest, snapshotCopy, inodeMap);
   }
 
   /**
-   * This is the external interface
-   */
+   * @param name the name of the child
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current directory.
+   * @return the child inode.
+   */
+  public INode getChild(byte[] name, Snapshot snapshot) {
+    final ReadOnlyList<INode> c = getChildrenList(snapshot);
+    final int i = ReadOnlyList.Util.binarySearch(c, name);
+    return i < 0? null: c.get(i);
+  }
+
+  /** @return the {@link INodesInPath} containing only the last inode. */
+  INodesInPath getLastINodeInPath(String path, boolean resolveLink
+      ) throws UnresolvedLinkException {
+    return INodesInPath.resolve(this, getPathComponents(path), 1, resolveLink);
+  }
+
+  /** @return the {@link INodesInPath} containing all inodes in the path. */
+  INodesInPath getINodesInPath(String path, boolean resolveLink
+      ) throws UnresolvedLinkException {
+    final byte[][] components = getPathComponents(path);
+    return INodesInPath.resolve(this, components, components.length, resolveLink);
+  }
+
+  /** @return the last inode in the path. */
   INode getNode(String path, boolean resolveLink) 
     throws UnresolvedLinkException {
-    return getNode(getPathComponents(path), resolveLink);
+    return getLastINodeInPath(path, resolveLink).getINode(0);
   }
 
   /**
-   * Retrieve existing INodes from a path. If existing is big enough to store
-   * all path components (existing and non-existing), then existing INodes
-   * will be stored starting from the root INode into existing[0]; if
-   * existing is not big enough to store all path components, then only the
-   * last existing and non existing INodes will be stored so that
-   * existing[existing.length-1] refers to the INode of the final component.
-   * 
-   * An UnresolvedPathException is always thrown when an intermediate path 
-   * component refers to a symbolic link. If the final path component refers 
-   * to a symbolic link then an UnresolvedPathException is only thrown if
-   * resolveLink is true.  
-   * 
-   * <p>
-   * Example: <br>
-   * Given the path /c1/c2/c3 where only /c1/c2 exists, resulting in the
-   * following path components: ["","c1","c2","c3"],
-   * 
-   * <p>
-   * <code>getExistingPathINodes(["","c1","c2"], [?])</code> should fill the
-   * array with [c2] <br>
-   * <code>getExistingPathINodes(["","c1","c2","c3"], [?])</code> should fill the
-   * array with [null]
-   * 
-   * <p>
-   * <code>getExistingPathINodes(["","c1","c2"], [?,?])</code> should fill the
-   * array with [c1,c2] <br>
-   * <code>getExistingPathINodes(["","c1","c2","c3"], [?,?])</code> should fill
-   * the array with [c2,null]
-   * 
-   * <p>
-   * <code>getExistingPathINodes(["","c1","c2"], [?,?,?,?])</code> should fill
-   * the array with [rootINode,c1,c2,null], <br>
-   * <code>getExistingPathINodes(["","c1","c2","c3"], [?,?,?,?])</code> should
-   * fill the array with [rootINode,c1,c2,null]
-   * 
-   * @param components array of path component name
-   * @param numOfINodes number of INodes to return
-   * @param resolveLink indicates whether UnresolvedLinkException should
-   *        be thrown when the path refers to a symbolic link.
-   * @return the specified number of existing INodes in the path
-   */
-  INodesInPath getExistingPathINodes(byte[][] components, int numOfINodes,
-      boolean resolveLink)
-      throws UnresolvedLinkException {
-    assert this.compareTo(components[0]) == 0 :
-        "Incorrect name " + getLocalName() + " expected "
-        + (components[0] == null? null: DFSUtil.bytes2String(components[0]));
-
-    INodesInPath existing = new INodesInPath(numOfINodes);
-    INode curNode = this;
-    int count = 0;
-    int index = numOfINodes - components.length;
-    if (index > 0) {
-      index = 0;
-    }
-    while (count < components.length && curNode != null) {
-      final boolean lastComp = (count == components.length - 1);      
-      if (index >= 0) {
-        existing.inodes[index] = curNode;
-      }
-      if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) {
-        final String path = constructPath(components, 0, components.length);
-        final String preceding = constructPath(components, 0, count);
-        final String remainder =
-          constructPath(components, count + 1, components.length);
-        final String link = DFSUtil.bytes2String(components[count]);
-        final String target = ((INodeSymlink)curNode).getSymlinkString();
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("UnresolvedPathException " +
-            " path: " + path + " preceding: " + preceding +
-            " count: " + count + " link: " + link + " target: " + target +
-            " remainder: " + remainder);
-        }
-        throw new UnresolvedPathException(path, preceding, remainder, target);
-      }
-      if (lastComp || !curNode.isDirectory()) {
-        break;
-      }
-      INodeDirectory parentDir = (INodeDirectory)curNode;
-      curNode = parentDir.getChildINode(components[count + 1]);
-      count++;
-      index++;
-    }
-    return existing;
+   * @return the INode of the last component in src, or null if the last
+   * component does not exist.
+   * @throws UnresolvedLinkException if symlink can't be resolved
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  INode getINode4Write(String src, boolean resolveLink)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    return getINodesInPath4Write(src, resolveLink).getLastINode();
   }
 
   /**
-   * Retrieve the existing INodes along the given path. The first INode
-   * always exist and is this INode.
-   * 
-   * @param path the path to explore
-   * @param resolveLink indicates whether UnresolvedLinkException should 
-   *        be thrown when the path refers to a symbolic link.
-   * @return INodes array containing the existing INodes in the order they
-   *         appear when following the path from the root INode to the
-   *         deepest INodes. The array size will be the number of expected
-   *         components in the path, and non existing components will be
-   *         filled with null
-   *         
-   * @see #getExistingPathINodes(byte[][], int, boolean)
-   */
-  INodesInPath getExistingPathINodes(String path, boolean resolveLink) 
-    throws UnresolvedLinkException {
-    byte[][] components = getPathComponents(path);
-    return getExistingPathINodes(components, components.length, resolveLink);
+   * @return the INodesInPath of the components in src
+   * @throws UnresolvedLinkException if symlink can't be resolved
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  INodesInPath getINodesInPath4Write(String src, boolean resolveLink)
+      throws UnresolvedLinkException, SnapshotAccessControlException {
+    final byte[][] components = INode.getPathComponents(src);
+    INodesInPath inodesInPath = INodesInPath.resolve(this, components,
+        components.length, resolveLink);
+    if (inodesInPath.isSnapshot()) {
+      throw new SnapshotAccessControlException(
+          "Modification on a read-only snapshot is disallowed");
+    }
+    return inodesInPath;
   }
 
   /**
@@ -266,11 +375,11 @@ class INodeDirectory extends INode {
    * @param name a child's name
    * @return the index of the next child
    */
-  int nextChild(byte[] name) {
+  static int nextChild(ReadOnlyList<INode> children, byte[] name) {
     if (name.length == 0) { // empty name
       return 0;
     }
-    int nextPos = Collections.binarySearch(children, name) + 1;
+    int nextPos = ReadOnlyList.Util.binarySearch(children, name) + 1;
     if (nextPos >= 0) {
       return nextPos;
     }
@@ -284,148 +393,171 @@ class INodeDirectory extends INode {
    * @param setModTime set modification time for the parent node
    *                   not needed when replaying the addition and 
    *                   the parent already has the proper mod time
+   * @param inodeMap update the inodeMap if the directory node gets replaced
    * @return false if the child with this name already exists; 
    *         otherwise, return true;
    */
-  boolean addChild(final INode node, final boolean setModTime) {
-    if (children == null) {
-      children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
-    }
-    final int low = searchChildren(node);
+  public boolean addChild(INode node, final boolean setModTime,
+      final Snapshot latest, final INodeMap inodeMap)
+      throws QuotaExceededException {
+    final int low = searchChildren(node.getLocalNameBytes());
     if (low >= 0) {
       return false;
     }
-    node.parent = this;
-    children.add(-low - 1, node);
-    // update modification time of the parent directory
-    if (setModTime)
-      setModificationTime(node.getModificationTime());
-    if (node.getGroupName() == null) {
-      node.setGroup(getGroupName());
+
+    if (isInLatestSnapshot(latest)) {
+      INodeDirectoryWithSnapshot sdir = 
+          replaceSelf4INodeDirectoryWithSnapshot(inodeMap);
+      boolean added = sdir.addChild(node, setModTime, latest, inodeMap);
+      return added;
+    }
+    addChild(node, low);
+    if (setModTime) {
+      // update modification time of the parent directory
+      updateModificationTime(node.getModificationTime(), latest, inodeMap);
     }
     return true;
   }
 
-  /**
-   * Add new INode to the file tree.
-   * Find the parent and insert 
-   * 
-   * @param path file path
-   * @param newNode INode to be added
-   * @return false if the node already exists; otherwise, return true;
-   * @throws FileNotFoundException if parent does not exist or 
-   * @throws UnresolvedLinkException if any path component is a symbolic link
-   * is not a directory.
-   */
-  boolean addINode(String path, INode newNode
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
-    byte[][] pathComponents = getPathComponents(path);        
-    if (pathComponents.length < 2) { // add root
+
+  /** The same as addChild(node, false, null, false) */
+  public boolean addChild(INode node) {
+    final int low = searchChildren(node.getLocalNameBytes());
+    if (low >= 0) {
       return false;
     }
-    newNode.setLocalName(pathComponents[pathComponents.length - 1]);
-    // insert into the parent children list
-    INodeDirectory parent = getParent(pathComponents);
-    return parent.addChild(newNode, true);
+    addChild(node, low);
+    return true;
   }
 
-  INodeDirectory getParent(byte[][] pathComponents
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
-    if (pathComponents.length < 2)  // add root
-      return null;
-    // Gets the parent INode
-    INodesInPath inodes =  getExistingPathINodes(pathComponents, 2, false);
-    return INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
+  /**
+   * Add the node to the children list at the given insertion point.
+   * The basic add method which actually calls children.add(..).
+   */
+  private void addChild(final INode node, final int insertionPoint) {
+    if (children == null) {
+      children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
+    }
+    node.setParent(this);
+    children.add(-insertionPoint - 1, node);
+
+    if (node.getGroupName() == null) {
+      node.setGroup(getGroupName());
+    }
   }
 
   @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += 1;
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
     if (children != null) {
       for (INode child : children) {
-        child.spaceConsumedInTree(counts);
+        child.computeQuotaUsage(counts, useCache, lastSnapshotId);
       }
     }
-    return counts;    
+    return computeQuotaUsage4CurrentDirectory(counts);
+  }
+  
+  /** Add quota usage for this inode excluding children. */
+  public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
+    counts.add(Quota.NAMESPACE, 1);
+    return counts;
   }
 
   @Override
-  long[] computeContentSummary(long[] summary) {
-    // Walk through the children of this node, using a new summary array
-    // for the (sub)tree rooted at this node
-    assert 4 == summary.length;
-    long[] subtreeSummary = new long[]{0,0,0,0};
-    if (children != null) {
-      for (INode child : children) {
-        child.computeContentSummary(subtreeSummary);
-      }
-    }
-    if (this instanceof INodeDirectoryWithQuota) {
-      // Warn if the cached and computed diskspace values differ
-      INodeDirectoryWithQuota node = (INodeDirectoryWithQuota)this;
-      long space = node.diskspaceConsumed();
-      assert -1 == node.getDsQuota() || space == subtreeSummary[3];
-      if (-1 != node.getDsQuota() && space != subtreeSummary[3]) {
-        NameNode.LOG.warn("Inconsistent diskspace for directory "
-            +getLocalName()+". Cached: "+space+" Computed: "+subtreeSummary[3]);
-      }
+  public Content.Counts computeContentSummary(final Content.Counts counts) {
+    for (INode child : getChildrenList(null)) {
+      child.computeContentSummary(counts);
     }
+    counts.add(Content.DIRECTORY, 1);
+    return counts;
+  }
 
-    // update the passed summary array with the values for this node's subtree
-    for (int i = 0; i < summary.length; i++) {
-      summary[i] += subtreeSummary[i];
-    }
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current directory.
+   * @return the current children list if the specified snapshot is null;
+   *         otherwise, return the children list corresponding to the snapshot.
+   *         Note that the returned list is never null.
+   */
+  public ReadOnlyList<INode> getChildrenList(final Snapshot snapshot) {
+    return children == null ? ReadOnlyList.Util.<INode>emptyList()
+        : ReadOnlyList.Util.asReadOnlyList(children);
+  }
 
-    summary[2]++;
-    return summary;
+  /** Set the children list to null. */
+  public void clearChildren() {
+    this.children = null;
   }
 
-  /**
-   * @return an empty list if the children list is null;
-   *         otherwise, return the children list.
-   *         The returned list should not be modified.
-   */
-  public List<INode> getChildrenList() {
-    return children==null ? EMPTY_LIST : children;
+  @Override
+  public void clear() {
+    super.clear();
+    clearChildren();
+  }
+
+  /** Call cleanSubtree(..) recursively down the subtree. */
+  public Quota.Counts cleanSubtreeRecursively(final Snapshot snapshot,
+      Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) throws QuotaExceededException {
+    Quota.Counts counts = Quota.Counts.newInstance();
+    // in case of deletion snapshot, since this call happens after we modify
+    // the diff list, the snapshot to be deleted has been combined or renamed
+    // to its latest previous snapshot. (besides, we also need to consider nodes
+    // created after prior but before snapshot. this will be done in 
+    // INodeDirectoryWithSnapshot#cleanSubtree)
+    Snapshot s = snapshot != null && prior != null ? prior : snapshot;
+    for (INode child : getChildrenList(s)) {
+      Quota.Counts childCounts = child.cleanSubtree(snapshot, prior,
+          collectedBlocks, removedINodes);
+      counts.add(childCounts);
+    }
+    return counts;
   }
 
   @Override
-  int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
-    int total = 1;
-    if (children == null) {
-      return total;
+  public void destroyAndCollectBlocks(final BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) {
+    for (INode child : getChildrenList(null)) {
+      child.destroyAndCollectBlocks(collectedBlocks, removedINodes);
     }
-    for (INode child : children) {
-      total += child.collectSubtreeBlocksAndClear(info);
+    clear();
+    removedINodes.add(this);
+  }
+  
+  @Override
+  public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
+      final BlocksMapUpdateInfo collectedBlocks, 
+      final List<INode> removedINodes) throws QuotaExceededException {
+    if (prior == null && snapshot == null) {
+      // destroy the whole subtree and collect blocks that should be deleted
+      Quota.Counts counts = Quota.Counts.newInstance();
+      this.computeQuotaUsage(counts, true);
+      destroyAndCollectBlocks(collectedBlocks, removedINodes);
+      return counts; 
+    } else {
+      // process recursively down the subtree
+      Quota.Counts counts = cleanSubtreeRecursively(snapshot, prior,
+          collectedBlocks, removedINodes);
+      if (isQuotaSet()) {
+        ((INodeDirectoryWithQuota) this).addSpaceConsumed2Cache(
+            -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      }
+      return counts;
     }
-    parent = null;
-    return total;
   }
   
   /**
-   * Used by
-   * {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}.
-   * Containing INodes information resolved from a given path.
+   * Compare the metadata with another INodeDirectory
    */
-  static class INodesInPath {
-    private INode[] inodes;
-    
-    public INodesInPath(int number) {
-      assert (number >= 0);
-      this.inodes = new INode[number];
-    }
-    
-    INode[] getINodes() {
-      return inodes;
-    }
-    
-    void setINode(int i, INode inode) {
-      inodes[i] = inode;
-    }
+  public boolean metadataEquals(INodeDirectory other) {
+    return other != null && getNsQuota() == other.getNsQuota()
+        && getDsQuota() == other.getDsQuota()
+        && getUserName().equals(other.getUserName())
+        && getGroupName().equals(other.getGroupName())
+        && getFsPermission().equals(other.getFsPermission());
   }
-
+  
   /*
    * The following code is to dump the tree recursively for testing.
    * 
@@ -441,13 +573,45 @@ class INodeDirectory extends INode {
   static final String DUMPTREE_LAST_ITEM = "\\-";
   @VisibleForTesting
   @Override
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
-    super.dumpTreeRecursively(out, prefix);
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
+      final Snapshot snapshot) {
+    super.dumpTreeRecursively(out, prefix, snapshot);
+    out.print(", childrenSize=" + getChildrenList(snapshot).size());
+    if (this instanceof INodeDirectoryWithQuota) {
+      out.print(((INodeDirectoryWithQuota)this).quotaString());
+    }
+    if (this instanceof Snapshot.Root) {
+      out.print(", snapshotId=" + snapshot.getId());
+    }
+    out.println();
+
     if (prefix.length() >= 2) {
       prefix.setLength(prefix.length() - 2);
       prefix.append("  ");
     }
-    dumpTreeRecursively(out, prefix, children);
+    dumpTreeRecursively(out, prefix, new Iterable<SnapshotAndINode>() {
+      final Iterator<INode> i = getChildrenList(snapshot).iterator();
+      
+      @Override
+      public Iterator<SnapshotAndINode> iterator() {
+        return new Iterator<SnapshotAndINode>() {
+          @Override
+          public boolean hasNext() {
+            return i.hasNext();
+          }
+
+          @Override
+          public SnapshotAndINode next() {
+            return new SnapshotAndINode(snapshot, i.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    });
   }
 
   /**
@@ -457,27 +621,29 @@ class INodeDirectory extends INode {
    */
   @VisibleForTesting
   protected static void dumpTreeRecursively(PrintWriter out,
-      StringBuilder prefix, List<? extends INode> subs) {
-    prefix.append(DUMPTREE_EXCEPT_LAST_ITEM);
-    if (subs != null && subs.size() != 0) {
-      int i = 0;
-      for(; i < subs.size() - 1; i++) {
-        subs.get(i).dumpTreeRecursively(out, prefix);
+      StringBuilder prefix, Iterable<SnapshotAndINode> subs) {
+    if (subs != null) {
+      for(final Iterator<SnapshotAndINode> i = subs.iterator(); i.hasNext();) {
+        final SnapshotAndINode pair = i.next();
+        prefix.append(i.hasNext()? DUMPTREE_EXCEPT_LAST_ITEM: DUMPTREE_LAST_ITEM);
+        pair.inode.dumpTreeRecursively(out, prefix, pair.snapshot);
         prefix.setLength(prefix.length() - 2);
-        prefix.append(DUMPTREE_EXCEPT_LAST_ITEM);
       }
-
-      prefix.setLength(prefix.length() - 2);
-      prefix.append(DUMPTREE_LAST_ITEM);
-      subs.get(i).dumpTreeRecursively(out, prefix);
     }
-    prefix.setLength(prefix.length() - 2);
   }
-  
-  void clearChildren() {
-    if (children != null) {
-      this.children.clear();
-      this.children = null;
+
+  /** A pair of Snapshot and INode objects. */
+  protected static class SnapshotAndINode {
+    public final Snapshot snapshot;
+    public final INode inode;
+
+    public SnapshotAndINode(Snapshot snapshot, INode inode) {
+      this.snapshot = snapshot;
+      this.inode = inode;
+    }
+
+    public SnapshotAndINode(Snapshot snapshot) {
+      this(snapshot, snapshot.getRoot());
     }
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Thu May  9 23:55:49 2013
@@ -23,14 +23,16 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Directory INode class that has a quota restriction
  */
-class INodeDirectoryWithQuota extends INodeDirectory {
+public class INodeDirectoryWithQuota extends INodeDirectory {
   /** Name space quota */
   private long nsQuota = Long.MAX_VALUE;
   /** Name space count */
-  private long nsCount = 1L;
+  private long namespace = 1L;
   /** Disk space quota */
   private long dsQuota = HdfsConstants.QUOTA_RESET;
   /** Disk space count */
@@ -42,35 +44,34 @@ class INodeDirectoryWithQuota extends IN
    * @param dsQuota Diskspace quota to be assigned to this indoe
    * @param other The other inode from which all other properties are copied
    */
-  INodeDirectoryWithQuota(long nsQuota, long dsQuota,
-      INodeDirectory other) {
-    super(other);
-    INode.DirCounts counts = new INode.DirCounts();
-    other.spaceConsumedInTree(counts);
-    this.nsCount = counts.getNsCount();
-    this.diskspace = counts.getDsCount();
+  public INodeDirectoryWithQuota(INodeDirectory other, boolean adopt,
+      long nsQuota, long dsQuota) {
+    super(other, adopt);
+    final Quota.Counts counts = other.computeQuotaUsage();
+    this.namespace = counts.get(Quota.NAMESPACE);
+    this.diskspace = counts.get(Quota.DISKSPACE);
     this.nsQuota = nsQuota;
     this.dsQuota = dsQuota;
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(long id, PermissionStatus permissions,
+  INodeDirectoryWithQuota(long id, byte[] name, PermissionStatus permissions,
       long modificationTime, long nsQuota, long dsQuota) {
-    super(id, permissions, modificationTime);
+    super(id, name, permissions, modificationTime);
     this.nsQuota = nsQuota;
     this.dsQuota = dsQuota;
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(long id, String name, PermissionStatus permissions) {
-    super(id, name, permissions);
+  INodeDirectoryWithQuota(long id, byte[] name, PermissionStatus permissions) {
+    super(id, name, permissions, 0L);
   }
   
   /** Get this directory's namespace quota
    * @return this directory's namespace quota
    */
   @Override
-  long getNsQuota() {
+  public long getNsQuota() {
     return nsQuota;
   }
   
@@ -78,7 +79,7 @@ class INodeDirectoryWithQuota extends IN
    * @return this directory's diskspace quota
    */
   @Override
-  long getDsQuota() {
+  public long getDsQuota() {
     return dsQuota;
   }
   
@@ -86,30 +87,68 @@ class INodeDirectoryWithQuota extends IN
    * 
    * @param nsQuota Namespace quota to be set
    * @param dsQuota diskspace quota to be set
-   *                                
    */
-  void setQuota(long newNsQuota, long newDsQuota) {
-    nsQuota = newNsQuota;
-    dsQuota = newDsQuota;
+  public void setQuota(long nsQuota, long dsQuota) {
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
-  
   @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += nsCount;
-    counts.dsCount += diskspace;
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
+    if (useCache && isQuotaSet()) {
+      // use cache value
+      counts.add(Quota.NAMESPACE, namespace);
+      counts.add(Quota.DISKSPACE, diskspace);
+    } else {
+      super.computeQuotaUsage(counts, false, lastSnapshotId);
+    }
     return counts;
   }
 
+  @Override
+  public Content.Counts computeContentSummary(
+      final Content.Counts counts) {
+    final long original = counts.get(Content.DISKSPACE);
+    super.computeContentSummary(counts);
+    checkDiskspace(counts.get(Content.DISKSPACE) - original);
+    return counts;
+  }
+  
+  private void checkDiskspace(final long computed) {
+    if (-1 != getDsQuota() && diskspace != computed) {
+      NameNode.LOG.error("BUG: Inconsistent diskspace for directory "
+          + getFullPathName() + ". Cached = " + diskspace
+          + " != Computed = " + computed);
+    }
+  }
+
   /** Get the number of names in the subtree rooted at this directory
    * @return the size of the subtree rooted at this directory
    */
   long numItemsInTree() {
-    return nsCount;
+    return namespace;
   }
   
-  long diskspaceConsumed() {
-    return diskspace;
+  @Override
+  public final void addSpaceConsumed(final long nsDelta, final long dsDelta,
+      boolean verify, int snapshotId) throws QuotaExceededException {
+    if (isQuotaSet()) { 
+      // The following steps are important: 
+      // check quotas in this inode and all ancestors before changing counts
+      // so that no change is made if there is any quota violation.
+
+      // (1) verify quota in this inode
+      if (verify) {
+        verifyQuota(nsDelta, dsDelta);
+      }
+      // (2) verify quota and then add count in ancestors 
+      super.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+      // (3) add count in this inode
+      addSpaceConsumed2Cache(nsDelta, dsDelta);
+    } else {
+      super.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+    }
   }
   
   /** Update the size of the tree
@@ -117,8 +156,9 @@ class INodeDirectoryWithQuota extends IN
    * @param nsDelta the change of the tree size
    * @param dsDelta change to disk space occupied
    */
-  void addSpaceConsumed(long nsDelta, long dsDelta) {
-    setSpaceConsumed(nsCount + nsDelta, diskspace + dsDelta);
+  protected void addSpaceConsumed2Cache(long nsDelta, long dsDelta) {
+    namespace += nsDelta;
+    diskspace += dsDelta;
   }
   
   /** 
@@ -130,23 +170,45 @@ class INodeDirectoryWithQuota extends IN
    * @param diskspace disk space take by all the nodes under this directory
    */
   void setSpaceConsumed(long namespace, long diskspace) {
-    this.nsCount = namespace;
+    this.namespace = namespace;
     this.diskspace = diskspace;
   }
   
+  /** Verify if the namespace quota is violated after applying delta. */
+  void verifyNamespaceQuota(long delta) throws NSQuotaExceededException {
+    if (Quota.isViolated(nsQuota, namespace, delta)) {
+      throw new NSQuotaExceededException(nsQuota, namespace + delta);
+    }
+  }
+
   /** Verify if the namespace count disk space satisfies the quota restriction 
    * @throws QuotaExceededException if the given quota is less than the count
    */
   void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
-    long newCount = nsCount + nsDelta;
-    long newDiskspace = diskspace + dsDelta;
-    if (nsDelta>0 || dsDelta>0) {
-      if (nsQuota >= 0 && nsQuota < newCount) {
-        throw new NSQuotaExceededException(nsQuota, newCount);
-      }
-      if (dsQuota >= 0 && dsQuota < newDiskspace) {
-        throw new DSQuotaExceededException(dsQuota, newDiskspace);
-      }
+    verifyNamespaceQuota(nsDelta);
+
+    if (Quota.isViolated(dsQuota, diskspace, dsDelta)) {
+      throw new DSQuotaExceededException(dsQuota, diskspace + dsDelta);
     }
   }
+
+  String namespaceString() {
+    return "namespace: " + (nsQuota < 0? "-": namespace + "/" + nsQuota);
+  }
+  String diskspaceString() {
+    return "diskspace: " + (dsQuota < 0? "-": diskspace + "/" + dsQuota);
+  }
+  String quotaString() {
+    return ", Quota[" + namespaceString() + ", " + diskspaceString() + "]";
+  }
+  
+  @VisibleForTesting
+  public long getNamespace() {
+    return this.namespace;
+  }
+  
+  @VisibleForTesting
+  public long getDiskspace() {
+    return this.diskspace;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu May  9 23:55:49 2013
@@ -19,29 +19,51 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.Util;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /** I-node for closed file. */
 @InterfaceAudience.Private
-public class INodeFile extends INode implements BlockCollection {
-  /** Cast INode to INodeFile. */
+public class INodeFile extends INodeWithAdditionalFields implements BlockCollection {
+  /** The same as valueOf(inode, path, false). */
   public static INodeFile valueOf(INode inode, String path
       ) throws FileNotFoundException {
+    return valueOf(inode, path, false);
+  }
+
+  /** Cast INode to INodeFile. */
+  public static INodeFile valueOf(INode inode, String path, boolean acceptNull)
+      throws FileNotFoundException {
     if (inode == null) {
-      throw new FileNotFoundException("File does not exist: " + path);
+      if (acceptNull) {
+        return null;
+      } else {
+        throw new FileNotFoundException("File does not exist: " + path);
+      }
     }
-    if (!(inode instanceof INodeFile)) {
+    if (!inode.isFile()) {
       throw new FileNotFoundException("Path is not a file: " + path);
     }
-    return (INodeFile)inode;
+    return inode.asFile();
   }
 
   /** Format: [16 bits for replication][48 bits for PreferredBlockSize] */
@@ -83,48 +105,134 @@ public class INodeFile extends INode imp
 
   private BlockInfo[] blocks;
 
-  INodeFile(long id, PermissionStatus permissions, BlockInfo[] blklist,
-      short replication, long modificationTime, long atime,
-      long preferredBlockSize) {
-    super(id, permissions, modificationTime, atime);
+  INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
+      BlockInfo[] blklist, short replication, long preferredBlockSize) {
+    super(id, name, permissions, mtime, atime);
     header = HeaderFormat.combineReplication(header, replication);
     header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
     this.blocks = blklist;
   }
   
+  public INodeFile(INodeFile that) {
+    super(that);
+    this.header = that.header;
+    this.blocks = that.blocks;
+  }
+
   /** @return true unconditionally. */
   @Override
   public final boolean isFile() {
     return true;
   }
 
-  /** @return the replication factor of the file. */
+  /** @return this object. */
+  @Override
+  public final INodeFile asFile() {
+    return this;
+  }
+
+  /** Is this file under construction? */
+  public boolean isUnderConstruction() {
+    return false;
+  }
+
+  /** Convert this file to an {@link INodeFileUnderConstruction}. */
+  public INodeFileUnderConstruction toUnderConstruction(
+      String clientName,
+      String clientMachine,
+      DatanodeDescriptor clientNode) {
+    Preconditions.checkState(!isUnderConstruction(),
+        "file is already an INodeFileUnderConstruction");
+    return new INodeFileUnderConstruction(this,
+        clientName, clientMachine, clientNode); 
+  }
+
+  @Override
+  public INodeFile getSnapshotINode(final Snapshot snapshot) {
+    return this;
+  }
+
   @Override
-  public short getBlockReplication() {
+  public INodeFile recordModification(final Snapshot latest,
+      final INodeMap inodeMap) throws QuotaExceededException {
+    if (isInLatestSnapshot(latest)) {
+      INodeFileWithSnapshot newFile = getParent()
+          .replaceChild4INodeFileWithSnapshot(this, inodeMap)
+          .recordModification(latest, inodeMap);
+      return newFile;
+    } else {
+      return this;
+    }
+  }
+
+  /** @return the replication factor of the file. */
+  public final short getFileReplication(Snapshot snapshot) {
+    if (snapshot != null) {
+      return getSnapshotINode(snapshot).getFileReplication();
+    }
+
     return HeaderFormat.getReplication(header);
   }
 
-  void setReplication(short replication) {
+  /** The same as getFileReplication(null). */
+  public final short getFileReplication() {
+    return getFileReplication(null);
+  }
+
+  @Override
+  public final short getBlockReplication() {
+    return this instanceof FileWithSnapshot?
+        Util.getBlockReplication((FileWithSnapshot)this)
+        : getFileReplication(null);
+  }
+
+  /** Set the replication factor of this file. */
+  public final void setFileReplication(short replication) {
     header = HeaderFormat.combineReplication(header, replication);
   }
 
+  /** Set the replication factor of this file. */
+  public final INodeFile setFileReplication(short replication, Snapshot latest,
+      final INodeMap inodeMap) throws QuotaExceededException {
+    final INodeFile nodeToUpdate = recordModification(latest, inodeMap);
+    nodeToUpdate.setFileReplication(replication);
+    return nodeToUpdate;
+  }
+
   /** @return preferred block size (in bytes) of the file. */
   @Override
   public long getPreferredBlockSize() {
     return HeaderFormat.getPreferredBlockSize(header);
   }
 
+  /** @return the diskspace required for a full block. */
+  final long getBlockDiskspace() {
+    return getPreferredBlockSize() * getBlockReplication();
+  }
+
   /** @return the blocks of the file. */
   @Override
   public BlockInfo[] getBlocks() {
     return this.blocks;
   }
 
+  void updateBlockCollection() {
+    if (blocks != null) {
+      for(BlockInfo b : blocks) {
+        b.setBlockCollection(this);
+      }
+    }
+  }
+
   /**
    * append array of blocks to this.blocks
    */
-  void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+  void concatBlocks(INodeFile[] inodes) {
     int size = this.blocks.length;
+    int totalAddedBlocks = 0;
+    for(INodeFile f : inodes) {
+      totalAddedBlocks += f.blocks.length;
+    }
     
     BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
     System.arraycopy(this.blocks, 0, newlist, 0, size);
@@ -133,11 +241,9 @@ public class INodeFile extends INode imp
       System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
       size += in.blocks.length;
     }
-    
-    for(BlockInfo bi: newlist) {
-      bi.setBlockCollection(this);
-    }
+
     setBlocks(newlist);
+    updateBlockCollection();
   }
   
   /**
@@ -166,16 +272,34 @@ public class INodeFile extends INode imp
   }
 
   @Override
-  int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
-    parent = null;
-    if(blocks != null && info != null) {
+  public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
+      final BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
+      throws QuotaExceededException {
+    Quota.Counts counts = Quota.Counts.newInstance();
+    if (snapshot == null && prior == null) {   
+      // this only happens when deleting the current file
+      computeQuotaUsage(counts, false);
+      destroyAndCollectBlocks(collectedBlocks, removedINodes);
+    }
+    return counts;
+  }
+
+  @Override
+  public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) {
+    if (blocks != null && collectedBlocks != null) {
       for (BlockInfo blk : blocks) {
-        info.addDeleteBlock(blk);
+        collectedBlocks.addDeleteBlock(blk);
         blk.setBlockCollection(null);
       }
     }
     setBlocks(null);
-    return 1;
+    clear();
+    removedINodes.add(this);
+    
+    if (this instanceof FileWithSnapshot) {
+      ((FileWithSnapshot) this).getDiffs().clear();
+    }
   }
   
   @Override
@@ -184,63 +308,144 @@ public class INodeFile extends INode imp
     return getFullPathName();
   }
 
+  @Override
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache, int lastSnapshotId) {
+    long nsDelta = 1;
+    final long dsDelta;
+    if (this instanceof FileWithSnapshot) {
+      FileDiffList fileDiffList = ((FileWithSnapshot) this).getDiffs();
+      Snapshot last = fileDiffList.getLastSnapshot();
+      List<FileDiff> diffs = fileDiffList.asList();
+
+      if (lastSnapshotId == Snapshot.INVALID_ID || last == null) {
+        nsDelta += diffs.size();
+        dsDelta = diskspaceConsumed();
+      } else if (last.getId() < lastSnapshotId) {
+        dsDelta = computeFileSize(true, false) * getFileReplication();
+      } else {      
+        Snapshot s = fileDiffList.getSnapshotById(lastSnapshotId);
+        dsDelta = diskspaceConsumed(s);
+      }
+    } else {
+      dsDelta = diskspaceConsumed();
+    }
+    counts.add(Quota.NAMESPACE, nsDelta);
+    counts.add(Quota.DISKSPACE, dsDelta);
+    return counts;
+  }
 
   @Override
-  long[] computeContentSummary(long[] summary) {
-    summary[0] += computeFileSize(true);
-    summary[1]++;
-    summary[3] += diskspaceConsumed();
-    return summary;
+  public final Content.Counts computeContentSummary(
+      final Content.Counts counts) {
+    computeContentSummary4Snapshot(counts);
+    computeContentSummary4Current(counts);
+    return counts;
+  }
+
+  private void computeContentSummary4Snapshot(final Content.Counts counts) {
+    // file length and diskspace only counted for the latest state of the file
+    // i.e. either the current state or the last snapshot
+    if (this instanceof FileWithSnapshot) {
+      final FileWithSnapshot withSnapshot = (FileWithSnapshot)this;
+      final FileDiffList diffs = withSnapshot.getDiffs();
+      final int n = diffs.asList().size();
+      counts.add(Content.FILE, n);
+      if (n > 0 && withSnapshot.isCurrentFileDeleted()) {
+        counts.add(Content.LENGTH, diffs.getLast().getFileSize());
+      }
+
+      if (withSnapshot.isCurrentFileDeleted()) {
+        final long lastFileSize = diffs.getLast().getFileSize();
+        counts.add(Content.DISKSPACE, lastFileSize * getBlockReplication());
+      }
+    }
+  }
+
+  private void computeContentSummary4Current(final Content.Counts counts) {
+    if (this instanceof FileWithSnapshot
+        && ((FileWithSnapshot)this).isCurrentFileDeleted()) {
+      return;
+    }
+
+    counts.add(Content.LENGTH, computeFileSize());
+    counts.add(Content.FILE, 1);
+    counts.add(Content.DISKSPACE, diskspaceConsumed());
+  }
+
+  /** The same as computeFileSize(null). */
+  public final long computeFileSize() {
+    return computeFileSize(null);
+  }
+
+  /**
+   * Compute file size of the current file if the given snapshot is null;
+   * otherwise, get the file size from the given snapshot.
+   */
+  public final long computeFileSize(Snapshot snapshot) {
+    if (snapshot != null && this instanceof FileWithSnapshot) {
+      final FileDiff d = ((FileWithSnapshot)this).getDiffs().getDiff(snapshot);
+      if (d != null) {
+        return d.getFileSize();
+      }
+    }
+
+    return computeFileSize(true, false);
+  }
+
+  /**
+   * Compute file size of the current file size
+   * but not including the last block if it is under construction.
+   */
+  public final long computeFileSizeNotIncludingLastUcBlock() {
+    return computeFileSize(false, false);
   }
 
-  /** Compute file size.
-   * May or may not include BlockInfoUnderConstruction.
+  /**
+   * Compute file size of the current file.
+   * 
+   * @param includesLastUcBlock
+   *          If the last block is under construction, should it be included?
+   * @param usePreferredBlockSize4LastUcBlock
+   *          If the last block is under construction, should we use actual
+   *          block size or preferred block size?
+   *          Note that usePreferredBlockSize4LastUcBlock is ignored
+   *          if includesLastUcBlock == false.
+   * @return file size
    */
-  long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+  public final long computeFileSize(boolean includesLastUcBlock,
+      boolean usePreferredBlockSize4LastUcBlock) {
     if (blocks == null || blocks.length == 0) {
       return 0;
     }
     final int last = blocks.length - 1;
     //check if the last block is BlockInfoUnderConstruction
-    long bytes = blocks[last] instanceof BlockInfoUnderConstruction
-                 && !includesBlockInfoUnderConstruction?
-                     0: blocks[last].getNumBytes();
+    long size = blocks[last].getNumBytes();
+    if (blocks[last] instanceof BlockInfoUnderConstruction) {
+       if (!includesLastUcBlock) {
+         size = 0;
+       } else if (usePreferredBlockSize4LastUcBlock) {
+         size = getPreferredBlockSize();
+       }
+    }
+    //sum other blocks
     for(int i = 0; i < last; i++) {
-      bytes += blocks[i].getNumBytes();
+      size += blocks[i].getNumBytes();
     }
-    return bytes;
+    return size;
   }
-  
 
-  @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += 1;
-    counts.dsCount += diskspaceConsumed();
-    return counts;
+  public final long diskspaceConsumed() {
+    // use preferred block size for the last block if it is under construction
+    return computeFileSize(true, true) * getBlockReplication();
   }
 
-  long diskspaceConsumed() {
-    return diskspaceConsumed(blocks);
-  }
-  
-  private long diskspaceConsumed(Block[] blkArr) {
-    long size = 0;
-    if(blkArr == null) 
-      return 0;
-    
-    for (Block blk : blkArr) {
-      if (blk != null) {
-        size += blk.getNumBytes();
-      }
-    }
-    /* If the last block is being written to, use prefferedBlockSize
-     * rather than the actual block size.
-     */
-    if (blkArr.length > 0 && blkArr[blkArr.length-1] != null && 
-        isUnderConstruction()) {
-      size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes();
+  public final long diskspaceConsumed(Snapshot lastSnapshot) {
+    if (lastSnapshot != null) {
+      return computeFileSize(lastSnapshot) * getFileReplication(lastSnapshot);
+    } else {
+      return diskspaceConsumed();
     }
-    return size * getBlockReplication();
   }
   
   /**
@@ -262,4 +467,16 @@ public class INodeFile extends INode imp
   public int numBlocks() {
     return blocks == null ? 0 : blocks.length;
   }
+
+  @VisibleForTesting
+  @Override
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
+      final Snapshot snapshot) {
+    super.dumpTreeRecursively(out, prefix, snapshot);
+    out.print(", fileSize=" + computeFileSize(snapshot));
+    // only compare the first block
+    out.print(", blocks=");
+    out.print(blocks == null || blocks.length == 0? null: blocks[0]);
+    out.println();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu May  9 23:55:49 2013
@@ -24,17 +24,22 @@ import java.util.Arrays;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+
+import com.google.common.base.Preconditions;
 
 /**
  * I-node for file being written.
  */
 @InterfaceAudience.Private
-class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
+public class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
   /** Cast INode to INodeFileUnderConstruction. */
   public static INodeFileUnderConstruction valueOf(INode inode, String path
       ) throws FileNotFoundException {
@@ -57,11 +62,8 @@ class INodeFileUnderConstruction extends
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    super(id, permissions, BlockInfo.EMPTY_ARRAY, replication, modTime,
-        modTime, preferredBlockSize);
-    this.clientName = clientName;
-    this.clientMachine = clientMachine;
-    this.clientNode = clientNode;
+    this(id, null, replication, modTime, preferredBlockSize, BlockInfo.EMPTY_ARRAY,
+        permissions, clientName, clientMachine, clientNode);
   }
 
   INodeFileUnderConstruction(long id,
@@ -74,15 +76,24 @@ class INodeFileUnderConstruction extends
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    super(id, perm, blocks, blockReplication, modificationTime,
-        modificationTime, preferredBlockSize);
-    setLocalName(name);
+    super(id, name, perm, modificationTime, modificationTime,
+        blocks, blockReplication, preferredBlockSize);
+    this.clientName = clientName;
+    this.clientMachine = clientMachine;
+    this.clientNode = clientNode;
+  }
+  
+  public INodeFileUnderConstruction(final INodeFile that,
+      final String clientName,
+      final String clientMachine,
+      final DatanodeDescriptor clientNode) {
+    super(that);
     this.clientName = clientName;
     this.clientMachine = clientMachine;
     this.clientNode = clientNode;
   }
 
-  String getClientName() {
+  public String getClientName() {
     return clientName;
   }
 
@@ -90,51 +101,56 @@ class INodeFileUnderConstruction extends
     this.clientName = clientName;
   }
 
-  String getClientMachine() {
+  public String getClientMachine() {
     return clientMachine;
   }
 
-  DatanodeDescriptor getClientNode() {
+  public DatanodeDescriptor getClientNode() {
     return clientNode;
   }
 
-  /**
-   * Is this inode being constructed?
-   */
+  /** @return true unconditionally. */
   @Override
-  public boolean isUnderConstruction() {
+  public final boolean isUnderConstruction() {
     return true;
   }
 
-  //
-  // converts a INodeFileUnderConstruction into a INodeFile
-  // use the modification time as the access time
-  //
-  INodeFile convertToInodeFile() {
-    assert allBlocksComplete() : "Can't finalize inode " + this
-      + " since it contains non-complete blocks! Blocks are "
-      + Arrays.asList(getBlocks());
-    INodeFile obj = new INodeFile(getId(),
-                                  getPermissionStatus(),
-                                  getBlocks(),
-                                  getBlockReplication(),
-                                  getModificationTime(),
-                                  getModificationTime(),
-                                  getPreferredBlockSize());
-    return obj;
-    
-  }
-  
   /**
-   * @return true if all of the blocks in this file are marked as completed.
+   * Converts an INodeFileUnderConstruction to an INodeFile.
+   * The original modification time is used as the access time.
+   * The new modification is the specified mtime.
    */
-  private boolean allBlocksComplete() {
-    for (BlockInfo b : getBlocks()) {
-      if (!b.isComplete()) {
-        return false;
-      }
+  protected INodeFile toINodeFile(long mtime) {
+    assertAllBlocksComplete();
+
+    final INodeFile f = new INodeFile(getId(), getLocalNameBytes(),
+        getPermissionStatus(), mtime, getModificationTime(),
+        getBlocks(), getFileReplication(), getPreferredBlockSize());
+    f.setParent(getParent());
+    return f;
+  }
+  
+  @Override
+  public INodeFileUnderConstruction recordModification(final Snapshot latest,
+      final INodeMap inodeMap) throws QuotaExceededException {
+    if (isInLatestSnapshot(latest)) {
+      INodeFileUnderConstructionWithSnapshot newFile = getParent()
+          .replaceChild4INodeFileUcWithSnapshot(this, inodeMap)
+          .recordModification(latest, inodeMap);
+      return newFile;
+    } else {
+      return this;
+    }
+  }
+
+  /** Assert all blocks are complete. */
+  protected void assertAllBlocksComplete() {
+    final BlockInfo[] blocks = getBlocks();
+    for (int i = 0; i < blocks.length; i++) {
+      Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
+          + " %s %s since blocks[%s] is non-complete, where blocks=%s.",
+          getClass().getSimpleName(), this, i, Arrays.asList(getBlocks()));
     }
-    return true;
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Thu May  9 23:55:49 2013
@@ -17,28 +17,55 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.PrintWriter;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 /**
  * An {@link INode} representing a symbolic link.
  */
 @InterfaceAudience.Private
-public class INodeSymlink extends INode {
+public class INodeSymlink extends INodeWithAdditionalFields {
   private final byte[] symlink; // The target URI
 
-  INodeSymlink(long id, String value, long mtime, long atime,
-      PermissionStatus permissions) {
-    super(id, permissions, mtime, atime);
-    this.symlink = DFSUtil.string2Bytes(value);
+  INodeSymlink(long id, byte[] name, PermissionStatus permissions,
+      long mtime, long atime, String symlink) {
+    super(id, name, permissions, mtime, atime);
+    this.symlink = DFSUtil.string2Bytes(symlink);
+  }
+  
+  INodeSymlink(INodeSymlink that) {
+    super(that);
+    this.symlink = that.symlink;
   }
 
   @Override
+  INode recordModification(Snapshot latest, final INodeMap inodeMap)
+      throws QuotaExceededException {
+    if (isInLatestSnapshot(latest)) {
+      INodeDirectory parent = getParent();
+      parent.saveChild2Snapshot(this, latest, new INodeSymlink(this), inodeMap);
+    }
+    return this;
+  }
+
+  /** @return true unconditionally. */
+  @Override
   public boolean isSymlink() {
     return true;
   }
 
+  /** @return this object. */
+  @Override
+  public INodeSymlink asSymlink() {
+    return this;
+  }
+
   public String getSymlinkString() {
     return DFSUtil.bytes2String(symlink);
   }
@@ -46,21 +73,39 @@ public class INodeSymlink extends INode 
   public byte[] getSymlink() {
     return symlink;
   }
+  
+  @Override
+  public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
+      final BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+    if (snapshot == null && prior == null) {
+      destroyAndCollectBlocks(collectedBlocks, removedINodes);
+    }
+    return Quota.Counts.newInstance(1, 0);
+  }
+  
+  @Override
+  public void destroyAndCollectBlocks(final BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) {
+    removedINodes.add(this);
+  }
 
   @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += 1;
+  public Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean updateCache, int lastSnapshotId) {
+    counts.add(Quota.NAMESPACE, 1);
     return counts;
   }
-  
+
   @Override
-  int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
-    return 1;
+  public Content.Counts computeContentSummary(final Content.Counts counts) {
+    counts.add(Content.SYMLINK, 1);
+    return counts;
   }
 
   @Override
-  long[] computeContentSummary(long[] summary) {
-    summary[1]++; // Increment the file count
-    return summary;
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
+      final Snapshot snapshot) {
+    super.dumpTreeRecursively(out, prefix, snapshot);
+    out.println();
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Thu May  9 23:55:49 2013
@@ -256,7 +256,9 @@ public class LeaseManager {
     private String findPath(INodeFileUnderConstruction pendingFile) {
       try {
         for (String src : paths) {
-          if (fsnamesystem.dir.getINode(src) == pendingFile) {
+          INode node = fsnamesystem.dir.getINode(src);
+          if (node == pendingFile
+              || (node.isFile() && node.asFile() == pendingFile)) {
             return src;
           }
         }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu May  9 23:55:49 2013
@@ -58,11 +58,13 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
@@ -1093,4 +1095,64 @@ class NameNodeRpcServer implements Namen
   public DataEncryptionKey getDataEncryptionKey() throws IOException {
     return namesystem.getBlockManager().generateDataEncryptionKey();
   }
+
+  @Override
+  public String createSnapshot(String snapshotRoot, String snapshotName)
+      throws IOException {
+    if (!checkPathLength(snapshotRoot)) {
+      throw new IOException("createSnapshot: Pathname too long.  Limit "
+          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
+    }
+    metrics.incrCreateSnapshotOps();
+    return namesystem.createSnapshot(snapshotRoot, snapshotName);
+  }
+  
+  @Override
+  public void deleteSnapshot(String snapshotRoot, String snapshotName)
+      throws IOException {
+    metrics.incrDeleteSnapshotOps();
+    namesystem.deleteSnapshot(snapshotRoot, snapshotName);
+  }
+
+  @Override
+  // Client Protocol
+  public void allowSnapshot(String snapshotRoot) throws IOException {
+    metrics.incrAllowSnapshotOps();
+    namesystem.allowSnapshot(snapshotRoot);
+  }
+
+  @Override
+  // Client Protocol
+  public void disallowSnapshot(String snapshot) throws IOException {
+    metrics.incrDisAllowSnapshotOps();
+    namesystem.disallowSnapshot(snapshot);
+  }
+
+  @Override
+  public void renameSnapshot(String snapshotRoot, String snapshotOldName,
+      String snapshotNewName) throws IOException {
+    if (snapshotNewName == null || snapshotNewName.isEmpty()) {
+      throw new IOException("The new snapshot name is null or empty.");
+    }
+    metrics.incrRenameSnapshotOps();
+    namesystem.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
+  }
+
+  @Override // Client Protocol
+  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
+      throws IOException {
+    SnapshottableDirectoryStatus[] status = namesystem
+        .getSnapshottableDirListing();
+    metrics.incrListSnapshottableDirOps();
+    return status;
+  }
+
+  @Override
+  public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
+      String earlierSnapshotName, String laterSnapshotName) throws IOException {
+    SnapshotDiffReport report = namesystem.getSnapshotDiffReport(snapshotRoot,
+        earlierSnapshotName, laterSnapshotName);
+    metrics.incrSnapshotDiffReportOps();
+    return report;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Thu May  9 23:55:49 2013
@@ -775,7 +775,16 @@ class NamenodeJspHelper {
       }
     }
   }
-  
+
+  private static String getLocalParentDir(INode inode) {
+    final INode parent = inode.isRoot() ? inode : inode.getParent();
+    String parentDir = "";
+    if (parent != null) {
+      parentDir = parent.getFullPathName();
+    }
+    return (parentDir != null) ? parentDir : "";
+  }
+
   // utility class used in block_info_xml.jsp
   static class XMLBlockInfo {
     final Block block;
@@ -790,7 +799,7 @@ class NamenodeJspHelper {
         this.inode = null;
       } else {
         this.block = new Block(blockId);
-        this.inode = (INodeFile) blockManager.getBlockCollection(block);
+        this.inode = ((INode)blockManager.getBlockCollection(block)).asFile();
       }
     }
 
@@ -817,7 +826,7 @@ class NamenodeJspHelper {
           doc.endTag();
 
           doc.startTag("local_directory");
-          doc.pcdata(inode.getLocalParentDir());
+          doc.pcdata(getLocalParentDir(inode));
           doc.endTag();
 
           doc.startTag("user_name");
@@ -849,7 +858,7 @@ class NamenodeJspHelper {
           doc.endTag();
 
           doc.startTag("replication");
-          doc.pcdata(""+inode.getBlockReplication());
+          doc.pcdata(""+inode.getFileReplication());
           doc.endTag();
 
           doc.startTag("disk_space_consumed");

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Thu May  9 23:55:49 2013
@@ -57,6 +57,20 @@ public class NameNodeMetrics {
   @Metric MutableCounterLong createSymlinkOps;
   @Metric MutableCounterLong getLinkTargetOps;
   @Metric MutableCounterLong filesInGetListingOps;
+  @Metric("Number of allowSnapshot operations")
+  MutableCounterLong allowSnapshotOps;
+  @Metric("Number of disallowSnapshot operations")
+  MutableCounterLong disallowSnapshotOps;
+  @Metric("Number of createSnapshot operations")
+  MutableCounterLong createSnapshotOps;
+  @Metric("Number of deleteSnapshot operations")
+  MutableCounterLong deleteSnapshotOps;
+  @Metric("Number of renameSnapshot operations")
+  MutableCounterLong renameSnapshotOps;
+  @Metric("Number of listSnapshottableDirectory operations")
+  MutableCounterLong listSnapshottableDirOps;
+  @Metric("Number of snapshotDiffReport operations")
+  MutableCounterLong snapshotDiffReportOps;
 
   @Metric("Journal transactions") MutableRate transactions;
   @Metric("Journal syncs") MutableRate syncs;
@@ -131,7 +145,7 @@ public class NameNodeMetrics {
     filesRenamed.incr();
   }
 
-  public void incrFilesDeleted(int delta) {
+  public void incrFilesDeleted(long delta) {
     filesDeleted.incr(delta);
   }
 
@@ -159,6 +173,34 @@ public class NameNodeMetrics {
     getLinkTargetOps.incr();
   }
 
+  public void incrAllowSnapshotOps() {
+    allowSnapshotOps.incr();
+  }
+  
+  public void incrDisAllowSnapshotOps() {
+    disallowSnapshotOps.incr();
+  }
+  
+  public void incrCreateSnapshotOps() {
+    createSnapshotOps.incr();
+  }
+  
+  public void incrDeleteSnapshotOps() {
+    deleteSnapshotOps.incr();
+  }
+  
+  public void incrRenameSnapshotOps() {
+    renameSnapshotOps.incr();
+  }
+  
+  public void incrListSnapshottableDirOps() {
+    listSnapshottableDirOps.incr();
+  }
+  
+  public void incrSnapshotDiffReportOps() {
+    snapshotDiffReportOps.incr();
+  }
+  
   public void addTransaction(long latency) {
     transactions.add(latency);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Thu May  9 23:55:49 2013
@@ -407,6 +407,30 @@ public class DFSAdmin extends FsShell {
   }
 
   /**
+   * Allow snapshot on a directory.
+   * Usage: java DFSAdmin -allowSnapshot snapshotDir
+   * @param argv List of of command line parameters.
+   * @exception IOException
+   */
+  public void allowSnapshot(String[] argv) throws IOException {   
+    DistributedFileSystem dfs = getDFS();
+    dfs.allowSnapshot(new Path(argv[1]));
+    System.out.println("Allowing snaphot on " + argv[1] + " succeeded");
+  }
+  
+  /**
+   * Allow snapshot on a directory.
+   * Usage: java DFSAdmin -disallowSnapshot snapshotDir
+   * @param argv List of of command line parameters.
+   * @exception IOException
+   */
+  public void disallowSnapshot(String[] argv) throws IOException {  
+    DistributedFileSystem dfs = getDFS();
+    dfs.disallowSnapshot(new Path(argv[1]));
+    System.out.println("Disallowing snaphot on " + argv[1] + " succeeded");
+  }
+  
+  /**
    * Command to ask the namenode to save the namespace.
    * Usage: java DFSAdmin -saveNamespace
    * @exception IOException 
@@ -547,6 +571,8 @@ public class DFSAdmin extends FsShell {
       "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
       "\t[-setBalancerBandwidth <bandwidth>]\n" +
       "\t[-fetchImage <local directory>]\n" +
+      "\t[-allowSnapshot <snapshotDir>]\n" +
+      "\t[-disallowSnapshot <snapshotDir>]\n" +
       "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -637,6 +663,12 @@ public class DFSAdmin extends FsShell {
       "\tDownloads the most recent fsimage from the Name Node and saves it in" +
       "\tthe specified local directory.\n";
     
+    String allowSnapshot = "-allowSnapshot <snapshotDir>:\n" +
+        "\tAllow snapshots to be taken on a directory.\n";
+    
+    String disallowSnapshot = "-disallowSnapshot <snapshotDir>:\n" +
+        "\tDo not allow snapshots to be taken on a directory any more.\n";
+    
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -680,6 +712,10 @@ public class DFSAdmin extends FsShell {
       System.out.println(setBalancerBandwidth);
     } else if ("fetchImage".equals(cmd)) {
       System.out.println(fetchImage);
+    } else if ("allowSnapshot".equalsIgnoreCase(cmd)) {
+      System.out.println(allowSnapshot);
+    } else if ("disallowSnapshot".equalsIgnoreCase(cmd)) {
+      System.out.println(disallowSnapshot);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -704,6 +740,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(deleteBlockPool);
       System.out.println(setBalancerBandwidth);
       System.out.println(fetchImage);
+      System.out.println(allowSnapshot);
+      System.out.println(disallowSnapshot);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -879,7 +917,13 @@ public class DFSAdmin extends FsShell {
                          + " [-report]");
     } else if ("-safemode".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
-                         + " [-safemode enter | leave | get | wait]");
+          + " [-safemode enter | leave | get | wait]");
+    } else if ("-allowSnapshot".equalsIgnoreCase(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+          + " [-allowSnapshot <snapshotDir>]");
+    } else if ("-disallowSnapshot".equalsIgnoreCase(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+          + " [-disallowSnapshot <snapshotDir>]");
     } else if ("-saveNamespace".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-saveNamespace]");
@@ -938,7 +982,9 @@ public class DFSAdmin extends FsShell {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
       System.err.println("           [-report]");
-      System.err.println("           [-safemode enter | leave | get | wait]");
+      System.err.println("           [-safemode enter | leave | get | wait]"); 
+      System.err.println("           [-allowSnapshot <snapshotDir>]");
+      System.err.println("           [-disallowSnapshot <snapshotDir>]");
       System.err.println("           [-saveNamespace]");
       System.err.println("           [-rollEdits]");
       System.err.println("           [-restoreFailedStorage true|false|check]");
@@ -988,6 +1034,16 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-allowSnapshot".equalsIgnoreCase(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    } else if ("-disallowSnapshot".equalsIgnoreCase(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
     } else if ("-report".equals(cmd)) {
       if (argv.length != 1) {
         printUsage(cmd);
@@ -1079,6 +1135,10 @@ public class DFSAdmin extends FsShell {
         report();
       } else if ("-safemode".equals(cmd)) {
         setSafeMode(argv, i);
+      } else if ("-allowSnapshot".equalsIgnoreCase(cmd)) {
+        allowSnapshot(argv);
+      } else if ("-disallowSnapshot".equalsIgnoreCase(cmd)) {
+        disallowSnapshot(argv);
       } else if ("-saveNamespace".equals(cmd)) {
         exitCode = saveNamespace();
       } else if ("-rollEdits".equals(cmd)) {



Mime
View raw message