hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1424782 [1/2] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ src/test/java/org/apache/hadoop/h...
Date Fri, 21 Dec 2012 01:30:50 GMT
Author: szetszwo
Date: Fri Dec 21 01:30:49 2012
New Revision: 1424782

URL: http://svn.apache.org/viewvc?rev=1424782&view=rev
Log:
HDFS-4103. Support O(1) snapshot creation.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotRename.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotReplication.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt Fri Dec 21 01:30:49 2012
@@ -85,3 +85,5 @@ Branch-2802 Snapshot (Unreleased)
   HDFS-4293. Fix TestSnapshot failure. (Jing Zhao via suresh)
 
   HDFS-4317. Change INode and its subclasses to support HDFS-4103. (szetszwo)
+
+  HDFS-4103. Support O(1) snapshot creation. (szetszwo)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Dec 21 01:30:49 2012
@@ -60,8 +60,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
@@ -82,7 +82,9 @@ public class FSDirectory implements Clos
     final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
         INodeDirectory.ROOT_NAME,
         namesystem.createFsOwnerPermissions(new FsPermission((short)0755)));
-    return INodeDirectorySnapshottable.newInstance(r, 0);
+    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
+    s.setSnapshotQuota(0);
+    return s;
   }
 
   INodeDirectoryWithQuota rootDir;
@@ -379,11 +381,9 @@ public class FSDirectory implements Clos
    */
   void closeFile(String path, INodeFile file) {
     waitForReady();
-    long now = now();
     writeLock();
     try {
       // file is closed
-      file.setModificationTime(now);
       fsImage.getEditLog().logCloseFile(path, file);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
@@ -563,7 +563,7 @@ public class FSDirectory implements Clos
     
     boolean added = false;
     INode srcChild = null;
-    String srcChildName = null;
+    byte[] srcChildName = null;
     try {
       // remove src
       srcChild = removeLastINode(srcInodesInPath);
@@ -573,7 +573,7 @@ public class FSDirectory implements Clos
             + " because the source can not be removed");
         return false;
       }
-      srcChildName = srcChild.getLocalName();
+      srcChildName = srcChild.getLocalNameBytes();
       srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
       // add src to the destination
@@ -585,8 +585,10 @@ public class FSDirectory implements Clos
               + src + " is renamed to " + dst);
         }
         // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].updateModificationTime(timestamp);
-        dstInodes[dstInodes.length-2].updateModificationTime(timestamp);
+        srcInodes[srcInodes.length-2].updateModificationTime(timestamp,
+            srcInodesInPath.getLatestSnapshot());
+        dstInodes[dstInodes.length-2].updateModificationTime(timestamp,
+            dstInodesInPath.getLatestSnapshot());
         // update moved leases with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);        
         return true;
@@ -734,13 +736,13 @@ public class FSDirectory implements Clos
           + error);
       throw new IOException(error);
     }
-    final String srcChildName = removedSrc.getLocalName();
-    String dstChildName = null;
+    final byte[] srcChildName = removedSrc.getLocalNameBytes();
+    byte[] dstChildName = null;
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
         removedDst = removeLastINode(dstInodesInPath);
-        dstChildName = removedDst.getLocalName();
+        dstChildName = removedDst.getLocalNameBytes();
       }
 
       removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
@@ -752,8 +754,10 @@ public class FSDirectory implements Clos
               "DIR* FSDirectory.unprotectedRenameTo: " + src
               + " is renamed to " + dst);
         }
-        srcInodes[srcInodes.length - 2].updateModificationTime(timestamp);
-        dstInodes[dstInodes.length - 2].updateModificationTime(timestamp);
+        srcInodes[srcInodes.length - 2].updateModificationTime(timestamp,
+            srcInodesInPath.getLatestSnapshot());
+        dstInodes[dstInodes.length - 2].updateModificationTime(timestamp,
+            dstInodesInPath.getLatestSnapshot());
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
 
@@ -829,7 +833,7 @@ public class FSDirectory implements Clos
     long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
     updateCount(inodesInPath, inodes.length-1, 0, dsDelta, true);
 
-    fileNode.setFileReplication(replication);
+    fileNode.setFileReplication(replication, inodesInPath.getLatestSnapshot());
 
     if (oldReplication != null) {
       oldReplication[0] = oldRepl;
@@ -899,11 +903,12 @@ public class FSDirectory implements Clos
       throws FileNotFoundException, UnresolvedLinkException,
       SnapshotAccessControlException {
     assert hasWriteLock();
-    INode inode = rootDir.getMutableNode(src, true);
+    final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(src, true);
+    final INode inode = inodesInPath.getLastINode();
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
-    inode.setPermission(permissions);
+    inode.setPermission(permissions, inodesInPath.getLatestSnapshot());
   }
 
   void setOwner(String src, String username, String groupname)
@@ -922,15 +927,16 @@ public class FSDirectory implements Clos
       throws FileNotFoundException, UnresolvedLinkException,
       SnapshotAccessControlException {
     assert hasWriteLock();
-    INode inode = rootDir.getMutableNode(src, true);
+    final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(src, true);
+    final INode inode = inodesInPath.getLastINode();
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
     if (username != null) {
-      inode.setUser(username);
+      inode.setUser(username, inodesInPath.getLatestSnapshot());
     }
     if (groupname != null) {
-      inode.setGroup(groupname);
+      inode.setGroup(groupname, inodesInPath.getLatestSnapshot());
     }
   }
 
@@ -973,6 +979,7 @@ public class FSDirectory implements Clos
     final INode[] trgINodes = trgINodesInPath.getINodes();
     INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
     INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
+    final Snapshot trgLatestSnapshot = trgINodesInPath.getLatestSnapshot();
     
     INodeFile [] allSrcInodes = new INodeFile[srcs.length];
     int i = 0;
@@ -990,12 +997,12 @@ public class FSDirectory implements Clos
       if(nodeToRemove == null) continue;
       
       nodeToRemove.setBlocks(null);
-      trgParent.removeChild(nodeToRemove);
+      trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
       count++;
     }
     
-    trgInode.setModificationTime(timestamp);
-    trgParent.updateModificationTime(timestamp);
+    trgInode.setModificationTime(timestamp, trgLatestSnapshot);
+    trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
   }
@@ -1125,16 +1132,23 @@ public class FSDirectory implements Clos
       BlocksMapUpdateInfo collectedBlocks, long mtime) {
     assert hasWriteLock();
 
-    final INode[] inodes = inodesInPath.getINodes();
-    INode targetNode = inodes[inodes.length-1];
     // Remove the node from the namespace
-    targetNode = removeLastINode(inodesInPath);
+    final INode targetNode = removeLastINode(inodesInPath);
     if (targetNode == null) {
       return 0;
     }
     // set the parent's modification time
-    inodes[inodes.length - 2].updateModificationTime(mtime);
-    int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
+    final INode[] inodes = inodesInPath.getINodes();
+    final Snapshot latestSnapshot = inodesInPath.getLatestSnapshot();
+    final INodeDirectory parent = (INodeDirectory)inodes[inodes.length - 2];
+    parent.updateModificationTime(mtime, latestSnapshot);
+
+    final INode snapshotCopy = parent.getChild(targetNode.getLocalNameBytes(),
+        latestSnapshot);
+    // if snapshotCopy == targetNode, it means that the file is also stored in
+    // a snapshot so that the block should not be removed.
+    final int filesRemoved = snapshotCopy == targetNode? 0
+        : targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
           + targetNode.getFullPathName() + " is removed");
@@ -1167,34 +1181,20 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Replaces the specified INodeDirectory.
-   */
-  public void replaceINodeDirectory(String path, INodeDirectory oldnode,
-      INodeDirectory newnode) throws IOException {    
-    writeLock();
-    try {
-      unprotectedReplaceINode(path, oldnode, newnode);
-      // Note that the parent of the children of the oldnode is already updated
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
    * Replaces the specified INodeFile with the specified one.
    */
   public void replaceINodeFile(String path, INodeFile oldnode,
-      INodeFile newnode) throws IOException {    
+      INodeFile newnode, Snapshot latest) throws IOException {
     writeLock();
     try {
-      unprotectedReplaceINodeFile(path, oldnode, newnode);
+      unprotectedReplaceINodeFile(path, oldnode, newnode, latest);
     } finally {
       writeUnlock();
     }
   }
 
   private void unprotectedReplaceINode(String path, INode oldnode,
-      INode newnode) throws IOException {    
+      INode newnode, Snapshot latest) throws IOException {    
     Preconditions.checkState(hasWriteLock());
 
     INodeDirectory parent = oldnode.getParent();
@@ -1206,20 +1206,19 @@ public class FSDirectory implements Clos
       throw new IOException(mess);
     }
     
-    final INode removed = parent.removeChild(oldnode);
+    final INode removed = parent.removeChild(oldnode, latest);
     Preconditions.checkState(removed == oldnode,
         "removed != oldnode=%s, removed=%s", oldnode, removed);
 
     parent = oldnode.getParent();
     oldnode.setParent(null);
-    parent.addChild(newnode, true);
-
+    parent.addChild(newnode, true, latest);
   }
 
   void unprotectedReplaceINodeFile(String path, INodeFile oldnode,
-      INodeFile newnode)
-      throws IOException, UnresolvedLinkException {
-    unprotectedReplaceINode(path, oldnode, newnode);
+      INodeFile newnode, Snapshot latest
+      ) throws IOException, UnresolvedLinkException {
+    unprotectedReplaceINode(path, oldnode, newnode, latest);
     newnode.setLocalName(oldnode.getLocalNameBytes());
     
     /* Currently oldnode and newnode are assumed to contain the same
@@ -1248,6 +1247,7 @@ public class FSDirectory implements Clos
     readLock();
     try {
       final INodesInPath inodesInPath = rootDir.getINodesInPath(srcs, true);
+      final Snapshot snapshot = inodesInPath.getPathSnapshot();
       final INode targetNode = inodesInPath.getINode(0);
       if (targetNode == null)
         return null;
@@ -1255,19 +1255,20 @@ public class FSDirectory implements Clos
       if (!targetNode.isDirectory()) {
         return new DirectoryListing(
             new HdfsFileStatus[]{createFileStatus(HdfsFileStatus.EMPTY_NAME,
-                targetNode, needLocation)}, 0);
+                targetNode, needLocation, snapshot)}, 0);
       }
 
       INodeDirectory dirInode = (INodeDirectory)targetNode;
       final ReadOnlyList<INode> contents = dirInode.getChildrenList(
           inodesInPath.getPathSnapshot());
-      int startChild = dirInode.nextChild(startAfter);
+      int startChild = INodeDirectory.nextChild(contents, startAfter);
       int totalNumChildren = contents.size();
       int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
       HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
       for (int i=0; i<numOfListing; i++) {
         INode cur = contents.get(startChild+i);
-        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation);
+        listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
+            needLocation, snapshot);
       }
       return new DirectoryListing(
           listing, totalNumChildren-startChild-numOfListing);
@@ -1287,13 +1288,10 @@ public class FSDirectory implements Clos
     String srcs = normalizePath(src);
     readLock();
     try {
-      INode targetNode = rootDir.getNode(srcs, resolveLink);
-      if (targetNode == null) {
-        return null;
-      }
-      else {
-        return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
-      }
+      final INodesInPath inodesInPath = rootDir.getINodesInPath(srcs, resolveLink);
+      final INode i = inodesInPath.getINode(0);
+      return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
+          inodesInPath.getPathSnapshot());
     } finally {
       readUnlock();
     }
@@ -1668,8 +1666,7 @@ public class FSDirectory implements Clos
   private boolean addINode(String src, INode child
       ) throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
-    byte[] path = components[components.length-1];
-    child.setLocalName(path);
+    child.setLocalName(components[components.length-1]);
     cacheName(child);
     writeLock();
     try {
@@ -1834,7 +1831,8 @@ public class FSDirectory implements Clos
     if (inodes[pos-1] == null) {
       throw new NullPointerException("Panic: parent does not exist");
     }
-    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
+    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true,
+        inodesInPath.getLatestSnapshot());
     if (!added) {
       updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
     }
@@ -1858,7 +1856,8 @@ public class FSDirectory implements Clos
   private INode removeLastINode(final INodesInPath inodesInPath) {
     final INode[] inodes = inodesInPath.getINodes();
     final int pos = inodes.length - 1;
-    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
+    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(
+        inodes[pos], inodesInPath.getLatestSnapshot());
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
@@ -1999,9 +1998,8 @@ public class FSDirectory implements Clos
     }
     
     String srcs = normalizePath(src);
-    final INode[] inodes = rootDir.getMutableINodesInPath(srcs, true)
-        .getINodes();
-    INodeDirectory dirNode = INodeDirectory.valueOf(inodes[inodes.length-1], srcs);
+    final INodesInPath iip = rootDir.getMutableINodesInPath(srcs, true);
+    INodeDirectory dirNode = INodeDirectory.valueOf(iip.getLastINode(), srcs);
     if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
       throw new IllegalArgumentException("Cannot clear namespace quota on root.");
     } else { // a directory inode
@@ -2014,24 +2012,17 @@ public class FSDirectory implements Clos
         dsQuota = oldDsQuota;
       }        
 
+      final Snapshot latest = iip.getLatestSnapshot();
       if (dirNode instanceof INodeDirectoryWithQuota) { 
         // a directory with quota; so set the quota to the new value
-        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
-        if (!dirNode.isQuotaSet()) {
+        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota, latest);
+        if (!dirNode.isQuotaSet() && latest == null) {
           // will not come here for root because root's nsQuota is always set
-          INodeDirectory newNode = new INodeDirectory(dirNode, true);
-          INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
-          dirNode = newNode;
-          parent.replaceChild(newNode);
+          return dirNode.replaceSelf4INodeDirectory();
         }
       } else {
         // a non-quota directory; so replace it with a directory with quota
-        final INodeDirectoryWithQuota newNode = new INodeDirectoryWithQuota(
-            dirNode, true, nsQuota, dsQuota);
-        // non-root directory node; parent != null
-        INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
-        dirNode = newNode;
-        parent.replaceChild(newNode);
+        return dirNode.replaceSelf4Quota(latest, oldNsQuota, oldDsQuota);
       }
       return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
     }
@@ -2070,11 +2061,12 @@ public class FSDirectory implements Clos
   /**
    * Sets the access time on the file/directory. Logs it in the transaction log.
    */
-  void setTimes(String src, INode inode, long mtime, long atime, boolean force) {
+  void setTimes(String src, INode inode, long mtime, long atime, boolean force,
+      Snapshot latest) {
     boolean status = false;
     writeLock();
     try {
-      status = unprotectedSetTimes(src, inode, mtime, atime, force);
+      status = unprotectedSetTimes(src, inode, mtime, atime, force, latest);
     } finally {
       writeUnlock();
     }
@@ -2086,27 +2078,28 @@ public class FSDirectory implements Clos
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
       throws UnresolvedLinkException {
     assert hasWriteLock();
-    INode inode = getINode(src);
-    return unprotectedSetTimes(src, inode, mtime, atime, force);
+    final INodesInPath i = getINodesInPath(src); 
+    return unprotectedSetTimes(src, i.getLastINode(), mtime, atime, force,
+        i.getLatestSnapshot());
   }
 
   private boolean unprotectedSetTimes(String src, INode inode, long mtime,
-                                      long atime, boolean force) {
+      long atime, boolean force, Snapshot latest) {
     assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
-      inode.setModificationTime(mtime);
+      inode.setModificationTime(mtime, latest);
       status = true;
     }
     if (atime != -1) {
-      long inodeTime = inode.getAccessTime();
+      long inodeTime = inode.getAccessTime(null);
 
       // if the last access time update was within the last precision interval, then
       // no need to store access time
       if (atime <= inodeTime + getFSNamesystem().getAccessTimePrecision() && !force) {
         status =  false;
       } else {
-        inode.setAccessTime(atime);
+        inode.setAccessTime(atime, latest);
         status = true;
       }
     } 
@@ -2137,17 +2130,18 @@ public class FSDirectory implements Clos
    * @throws IOException if any error occurs
    */
   private HdfsFileStatus createFileStatus(byte[] path, INode node,
-      boolean needLocation) throws IOException {
+      boolean needLocation, Snapshot snapshot) throws IOException {
     if (needLocation) {
-      return createLocatedFileStatus(path, node);
+      return createLocatedFileStatus(path, node, snapshot);
     } else {
-      return createFileStatus(path, node);
+      return createFileStatus(path, node, snapshot);
     }
   }
   /**
    * Create FileStatus by file INode 
    */
-   private HdfsFileStatus createFileStatus(byte[] path, INode node) {
+   private HdfsFileStatus createFileStatus(byte[] path, INode node,
+       Snapshot snapshot) {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
@@ -2162,11 +2156,11 @@ public class FSDirectory implements Clos
         node.isDirectory(), 
         replication, 
         blocksize,
-        node.getModificationTime(),
-        node.getAccessTime(),
-        node.getFsPermission(),
-        node.getUserName(),
-        node.getGroupName(),
+        node.getModificationTime(snapshot),
+        node.getAccessTime(snapshot),
+        node.getFsPermission(snapshot),
+        node.getUserName(snapshot),
+        node.getGroupName(snapshot),
         node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
         path);
   }
@@ -2175,7 +2169,7 @@ public class FSDirectory implements Clos
     * Create FileStatus with location info by file INode 
     */
     private HdfsLocatedFileStatus createLocatedFileStatus(
-        byte[] path, INode node) throws IOException {
+        byte[] path, INode node, Snapshot snapshot) throws IOException {
       assert hasReadLock();
       long size = 0;     // length is zero for directories
       short replication = 0;
@@ -2198,11 +2192,11 @@ public class FSDirectory implements Clos
           node.isDirectory(), 
           replication, 
           blocksize,
-          node.getModificationTime(),
-          node.getAccessTime(),
-          node.getFsPermission(),
-          node.getUserName(),
-          node.getGroupName(),
+          node.getModificationTime(snapshot),
+          node.getAccessTime(snapshot),
+          node.getFsPermission(snapshot),
+          node.getUserName(snapshot),
+          node.getGroupName(snapshot),
           node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
           path,
           loc);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Dec 21 01:30:49 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.util.Holder;
 
@@ -245,7 +246,8 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append
 
       // See if the file already exists (persistBlocks call)
-      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+      final INodesInPath iip = fsDir.getINodesInPath(addCloseOp.path);
+      final INodeFile oldFile = toINodeFile(iip.getINode(0), addCloseOp.path);
       INodeFile newFile = oldFile;
       if (oldFile == null) { // this is OP_ADD on a new file (case 1)
         // versions > 0 support per file replication
@@ -271,7 +273,7 @@ public class FSEditLogLoader {
           }
           fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
               addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false);
+              false, iip.getLatestSnapshot());
           newFile = getINodeFile(fsDir, addCloseOp.path);
         }
       }
@@ -280,8 +282,8 @@ public class FSEditLogLoader {
       // update the block list.
       
       // Update the salient file attributes.
-      newFile.setAccessTime(addCloseOp.atime);
-      newFile.setModificationTime(addCloseOp.mtime);
+      newFile.setAccessTime(addCloseOp.atime, null);
+      newFile.setModificationTime(addCloseOp.mtime, null);
       updateBlocks(fsDir, addCloseOp, newFile);
       break;
     }
@@ -295,15 +297,16 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
       }
 
-      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+      final INodesInPath iip = fsDir.getINodesInPath(addCloseOp.path);
+      final INodeFile oldFile = toINodeFile(iip.getINode(0), addCloseOp.path);
       if (oldFile == null) {
         throw new IOException("Operation trying to close non-existent file " +
             addCloseOp.path);
       }
       
       // Update the salient file attributes.
-      oldFile.setAccessTime(addCloseOp.atime);
-      oldFile.setModificationTime(addCloseOp.mtime);
+      oldFile.setAccessTime(addCloseOp.atime, null);
+      oldFile.setModificationTime(addCloseOp.mtime, null);
       updateBlocks(fsDir, addCloseOp, oldFile);
 
       // Now close the file
@@ -321,7 +324,8 @@ public class FSEditLogLoader {
         INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
         fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
         INodeFile newFile = ucFile.convertToInodeFile(ucFile.getModificationTime());
-        fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile);
+        fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile,
+            iip.getLatestSnapshot());
       }
       break;
     }
@@ -506,7 +510,11 @@ public class FSEditLogLoader {
   
   private static INodeFile getINodeFile(FSDirectory fsDir, String path)
       throws IOException {
-    INode inode = fsDir.getINode(path);
+    return toINodeFile(fsDir.getINode(path), path);
+  }
+
+  private static INodeFile toINodeFile(INode inode, String path)
+      throws IOException {
     if (inode != null) {
       if (!(inode instanceof INodeFile)) {
         throw new IOException("Operation trying to get non-file " + path);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Dec 21 01:30:49 2012
@@ -204,7 +204,7 @@ class FSImageFormat {
     long dsQuota = root.getDsQuota();
     FSDirectory fsDir = namesystem.dir;
     if (nsQuota != -1 || dsQuota != -1) {
-      fsDir.rootDir.setQuota(nsQuota, dsQuota);
+      fsDir.rootDir.setQuota(nsQuota, dsQuota, null);
     }
     fsDir.rootDir.cloneModificationTime(root);
     fsDir.rootDir.clonePermissionStatus(root);    
@@ -321,7 +321,7 @@ class FSImageFormat {
    */
   private void addToParent(INodeDirectory parent, INode child) {
     // NOTE: This does not update space counts for parents
-    if (!parent.addChild(child, false)) {
+    if (!parent.addChild(child, false, null)) {
       return;
     }
     namesystem.dir.cacheName(child);
@@ -404,8 +404,10 @@ class FSImageFormat {
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
-        INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path);
-        fsDir.replaceINodeFile(path, oldnode, cons);
+        final INodesInPath iip = fsDir.getINodesInPath(path);
+        INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path);
+        fsDir.unprotectedReplaceINodeFile(path, oldnode, cons,
+            iip.getLatestSnapshot());
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Dec 21 01:30:49 2012
@@ -176,6 +176,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -547,7 +548,7 @@ public class FSNamesystem implements Nam
 
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
-      this.snapshotManager = new SnapshotManager(this, dir);
+      this.snapshotManager = new SnapshotManager(dir);
       this.safeMode = new SafeModeInfo(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -1322,7 +1323,8 @@ public class FSNamesystem implements Nam
         }
 
         long now = now();
-        final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
+        final INodesInPath iip = dir.getMutableINodesInPath(src);
+        final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
         if (doAccessTime && isAccessTimeSupported()) {
           if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
             // if we have to set access time but we only have the readlock, then
@@ -1331,7 +1333,7 @@ public class FSNamesystem implements Nam
               continue;
             }
           }
-          dir.setTimes(src, inode, -1, now, false);
+          dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshot());
         }
         return blockManager.createLocatedBlocks(inode.getBlocks(),
             inode.computeFileSize(false), inode.isUnderConstruction(),
@@ -1559,9 +1561,10 @@ public class FSNamesystem implements Nam
       if (isPermissionEnabled) {
         checkPathAccess(src, FsAction.WRITE);
       }
-      INode inode = dir.getMutableINode(src);
+      final INodesInPath iip = dir.getMutableINodesInPath(src);
+      final INode inode = iip.getLastINode();
       if (inode != null) {
-        dir.setTimes(src, inode, mtime, atime, true);
+        dir.setTimes(src, inode, mtime, atime, true, iip.getLatestSnapshot());
         if (isAuditEnabled() && isExternalInvocation()) {
           final HdfsFileStatus stat = dir.getFileInfo(src, false);
           logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -1864,7 +1867,9 @@ public class FSNamesystem implements Nam
     try {
       blockManager.verifyReplication(src, replication, clientMachine);
       boolean create = flag.contains(CreateFlag.CREATE);
-      final INode myFile = dir.getINode(src);
+      
+      final INodesInPath iip = dir.getINodesInPath(src);
+      final INode myFile = iip.getINode(0);
       if (myFile == null) {
         if (!create) {
           throw new FileNotFoundException("failed to overwrite or append to non-existent file "
@@ -1891,8 +1896,8 @@ public class FSNamesystem implements Nam
 
       if (append && myFile != null) {
         final INodeFile f = INodeFile.valueOf(myFile, src); 
-        return prepareFileForWrite(
-            src, f, holder, clientMachine, clientNode, true);
+        return prepareFileForWrite(src, f, holder, clientMachine, clientNode,
+            true, iip.getLatestSnapshot());
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1940,7 +1945,7 @@ public class FSNamesystem implements Nam
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog) throws IOException {
+      boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
     //TODO SNAPSHOT: INodeFileUnderConstruction with link
     INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
                                     file.getLocalNameBytes(),
@@ -1952,7 +1957,7 @@ public class FSNamesystem implements Nam
                                     leaseHolder,
                                     clientMachine,
                                     clientNode);
-    dir.replaceINodeFile(src, file, cons);
+    dir.replaceINodeFile(src, file, cons, latestSnapshot);
     leaseManager.addLease(cons.getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
@@ -2288,17 +2293,16 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot add block to " + src, safeMode);
       }
 
-      final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
-      final INode[] inodes = inodesInPath.getINodes();
+      final INodesInPath iip = dir.rootDir.getExistingPathINodes(src, true);
       final INodeFileUnderConstruction pendingFile
-          = checkLease(src, clientName, inodes[inodes.length - 1]);
+          = checkLease(src, clientName, iip.getLastINode());
                                                            
       if (!checkFileProgress(pendingFile, false)) {
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, inodesInPath, targets);
+      newBlock = allocateBlock(src, iip, targets);
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
@@ -2401,10 +2405,9 @@ public class FSNamesystem implements Nam
     return true;
   }
   
-  // make sure that we still have the lease on this file.
+  /** make sure that we still have the lease on this file. */
   private INodeFileUnderConstruction checkLease(String src, String holder) 
       throws LeaseExpiredException, UnresolvedLinkException {
-    assert hasReadOrWriteLock();
     return checkLease(src, holder, dir.getINode(src));
   }
 
@@ -2468,9 +2471,10 @@ public class FSNamesystem implements Nam
       throw new SafeModeException("Cannot complete file " + src, safeMode);
     }
 
-    INodeFileUnderConstruction pendingFile;
+    final INodesInPath iip = dir.getINodesInPath(src);
+    final INodeFileUnderConstruction pendingFile;
     try {
-      pendingFile = checkLease(src, holder);
+      pendingFile = checkLease(src, holder, iip.getINode(0)); 
     } catch (LeaseExpiredException lee) {
       final INode inode = dir.getINode(src);
       if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) {
@@ -2498,7 +2502,8 @@ public class FSNamesystem implements Nam
       return false;
     }
 
-    finalizeINodeFileUnderConstruction(src, pendingFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile,
+        iip.getLatestSnapshot());
 
     NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
         + holder);
@@ -3110,8 +3115,9 @@ public class FSNamesystem implements Nam
     assert !isInSafeMode();
     assert hasWriteLock();
 
+    final INodesInPath iip = dir.getINodesInPath(src);
     final INodeFileUnderConstruction pendingFile
-        = INodeFileUnderConstruction.valueOf(dir.getINode(src), src);
+        = INodeFileUnderConstruction.valueOf(iip.getINode(0), src);
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
@@ -3128,7 +3134,8 @@ public class FSNamesystem implements Nam
     // If there are no incomplete blocks associated with this file,
     // then reap lease immediately and close the file.
     if(nrCompleteBlocks == nrBlocks) {
-      finalizeINodeFileUnderConstruction(src, pendingFile);
+      finalizeINodeFileUnderConstruction(src, pendingFile,
+          iip.getLatestSnapshot());
       NameNode.stateChangeLog.warn("BLOCK*"
         + " internalReleaseLease: All existing blocks are COMPLETE,"
         + " lease removed, file closed.");
@@ -3176,7 +3183,8 @@ public class FSNamesystem implements Nam
       // Close file if committed blocks are minimally replicated
       if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
-        finalizeINodeFileUnderConstruction(src, pendingFile);
+        finalizeINodeFileUnderConstruction(src, pendingFile,
+            iip.getLatestSnapshot());
         NameNode.stateChangeLog.warn("BLOCK*"
           + " internalReleaseLease: Committed blocks are minimally replicated,"
           + " lease removed, file closed.");
@@ -3254,7 +3262,7 @@ public class FSNamesystem implements Nam
   }
 
   private void finalizeINodeFileUnderConstruction(String src, 
-      INodeFileUnderConstruction pendingFile) 
+      INodeFileUnderConstruction pendingFile, Snapshot latestSnapshot) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
@@ -3262,7 +3270,7 @@ public class FSNamesystem implements Nam
     // The file is no longer pending.
     // Create permanent INode, update blocks
     INodeFile newFile = pendingFile.convertToInodeFile(now());
-    dir.replaceINodeFile(src, pendingFile, newFile);
+    dir.replaceINodeFile(src, pendingFile, newFile, latestSnapshot);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
@@ -3354,7 +3362,8 @@ public class FSNamesystem implements Nam
         commitOrCompleteLastBlock(pendingFile, storedBlock);
 
         //remove lease, close file
-        finalizeINodeFileUnderConstruction(src, pendingFile);
+        finalizeINodeFileUnderConstruction(src, pendingFile,
+            Snapshot.findLatestSnapshot(pendingFile));
       } else {
         // If this commit does not want to close the file, persist blocks
         dir.persistBlocks(src, pendingFile);
@@ -5625,8 +5634,7 @@ public class FSNamesystem implements Nam
       }
       checkOwner(path);
 
-      //TODO: do not hardcode snapshot quota value
-      snapshotManager.setSnapshottable(path, 256);
+      snapshotManager.setSnapshottable(path);
       getEditLog().logAllowSnapshot(path);
     } finally {
       writeUnlock();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Dec 21 01:30:49 2012
@@ -105,7 +105,6 @@ class FSPermissionChecker {
    * @param subAccess If path is a directory,
    * it is the access required of the path and all the sub-directories.
    * If path is not a directory, there is no effect.
-   * @return a PermissionChecker object which caches data for later use.
    * @throws AccessControlException
    * @throws UnresolvedLinkException
    */
@@ -124,45 +123,47 @@ class FSPermissionChecker {
     // check if (parentAccess != null) && file exists, then check sb
       // Resolve symlinks, the check is performed on the link target.
       final INodesInPath inodesInPath = root.getExistingPathINodes(path, true); 
+      final Snapshot snapshot = inodesInPath.getPathSnapshot();
       final INode[] inodes = inodesInPath.getINodes();
       int ancestorIndex = inodes.length - 2;
       for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
           ancestorIndex--);
-      checkTraverse(inodes, ancestorIndex);
+      checkTraverse(inodes, ancestorIndex, snapshot);
 
+      final INode last = inodes[inodes.length - 1];
       if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-          && inodes.length > 1 && inodes[inodes.length - 1] != null) {
-        checkStickyBit(inodes[inodes.length - 2], inodes[inodes.length - 1]);
+          && inodes.length > 1 && last != null) {
+        checkStickyBit(inodes[inodes.length - 2], last, snapshot);
       }
       if (ancestorAccess != null && inodes.length > 1) {
-        check(inodes, ancestorIndex, ancestorAccess);
+        check(inodes, ancestorIndex, snapshot, ancestorAccess);
       }
       if (parentAccess != null && inodes.length > 1) {
-        check(inodes, inodes.length - 2, parentAccess);
+        check(inodes, inodes.length - 2, snapshot, parentAccess);
       }
       if (access != null) {
-        check(inodes[inodes.length - 1], access);
+        check(last, snapshot, access);
       }
       if (subAccess != null) {
-        final Snapshot s = inodesInPath.getPathSnapshot();
-        checkSubAccess(inodes[inodes.length - 1], s, subAccess);
+        checkSubAccess(last, snapshot, subAccess);
       }
       if (doCheckOwner) {
-        checkOwner(inodes[inodes.length - 1]);
+        checkOwner(last, snapshot);
       }
   }
 
-  private void checkOwner(INode inode) throws AccessControlException {
-    if (inode != null && user.equals(inode.getUserName())) {
+  private void checkOwner(INode inode, Snapshot snapshot
+      ) throws AccessControlException {
+    if (inode != null && user.equals(inode.getUserName(snapshot))) {
       return;
     }
     throw new AccessControlException("Permission denied");
   }
 
-  private void checkTraverse(INode[] inodes, int last
+  private void checkTraverse(INode[] inodes, int last, Snapshot snapshot
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
-      check(inodes[j], FsAction.EXECUTE);
+      check(inodes[j], snapshot, FsAction.EXECUTE);
     }
   }
 
@@ -175,7 +176,7 @@ class FSPermissionChecker {
     Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
     for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
       INodeDirectory d = directories.pop();
-      check(d, access);
+      check(d, snapshot, access);
 
       for(INode child : d.getChildrenList(snapshot)) {
         if (child.isDirectory()) {
@@ -185,22 +186,22 @@ class FSPermissionChecker {
     }
   }
 
-  private void check(INode[] inodes, int i, FsAction access
+  private void check(INode[] inodes, int i, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
-    check(i >= 0? inodes[i]: null, access);
+    check(i >= 0? inodes[i]: null, snapshot, access);
   }
 
-  private void check(INode inode, FsAction access
+  private void check(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null) {
       return;
     }
-    FsPermission mode = inode.getFsPermission();
+    FsPermission mode = inode.getFsPermission(snapshot);
 
-    if (user.equals(inode.getUserName())) { //user class
+    if (user.equals(inode.getUserName(snapshot))) { //user class
       if (mode.getUserAction().implies(access)) { return; }
     }
-    else if (groups.contains(inode.getGroupName())) { //group class
+    else if (groups.contains(inode.getGroupName(snapshot))) { //group class
       if (mode.getGroupAction().implies(access)) { return; }
     }
     else { //other class
@@ -210,18 +211,19 @@ class FSPermissionChecker {
         + ", access=" + access + ", inode=" + inode);
   }
 
-  private void checkStickyBit(INode parent, INode inode) throws AccessControlException {
-    if(!parent.getFsPermission().getStickyBit()) {
+  private void checkStickyBit(INode parent, INode inode, Snapshot snapshot
+      ) throws AccessControlException {
+    if(!parent.getFsPermission(snapshot).getStickyBit()) {
       return;
     }
 
     // If this user is the directory owner, return
-    if(parent.getUserName().equals(user)) {
+    if(parent.getUserName(snapshot).equals(user)) {
       return;
     }
 
     // if this user is the file owner, return
-    if(inode.getUserName().equals(user)) {
+    if(inode.getUserName(snapshot).equals(user)) {
       return;
     }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Dec 21 01:30:49 2012
@@ -35,10 +35,12 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.primitives.SignedBytes;
 
 /**
@@ -52,6 +54,17 @@ public abstract class INode implements C
 
   static final ReadOnlyList<INode> EMPTY_READ_ONLY_LIST
       = ReadOnlyList.Util.emptyList();
+  
+  /**
+   * Assert that the snapshot parameter must be null since this class only take
+   * care current state. Subclasses should override the methods for handling the
+   * snapshot states.
+   */
+  static void assertNull(Snapshot snapshot) {
+    if (snapshot != null) {
+      throw new AssertionError("snapshot is not null: " + snapshot);
+    }
+  }
 
   /** A pair of objects. */
   public static class Pair<L, R> {
@@ -144,9 +157,9 @@ public abstract class INode implements C
    * should not modify it.
    */
   private long permission = 0L;
-  protected INodeDirectory parent = null;
-  protected long modificationTime = 0L;
-  protected long accessTime = 0L;
+  INodeDirectory parent = null;
+  private long modificationTime = 0L;
+  private long accessTime = 0L;
 
   private INode(byte[] name, long permission, INodeDirectory parent,
       long modificationTime, long accessTime) {
@@ -173,8 +186,8 @@ public abstract class INode implements C
   
   /** @param other Other node to be copied */
   INode(INode other) {
-    this(other.getLocalNameBytes(), other.permission, other.getParent(), 
-        other.getModificationTime(), other.getAccessTime());
+    this(other.name, other.permission, other.parent, 
+        other.modificationTime, other.accessTime);
   }
 
   /**
@@ -186,7 +199,10 @@ public abstract class INode implements C
    *         may be replaced with a new inode for maintaining snapshot data.
    *         Then, the current inode is the new inode.
    */
-  public abstract Pair<? extends INode, ? extends INode> createSnapshotCopy();
+  public Pair<? extends INode, ? extends INode> createSnapshotCopy() {
+    throw new UnsupportedOperationException(getClass().getSimpleName()
+        + " does not support createSnapshotCopy().");
+  }
 
   /**
    * Check whether this is the root inode.
@@ -200,43 +216,92 @@ public abstract class INode implements C
     this.permission = that.permission;
   }
   /** Get the {@link PermissionStatus} */
+  public PermissionStatus getPermissionStatus(Snapshot snapshot) {
+    return new PermissionStatus(getUserName(snapshot), getGroupName(snapshot),
+        getFsPermission(snapshot));
+  }
+  /** The same as getPermissionStatus(null). */
   public PermissionStatus getPermissionStatus() {
-    return new PermissionStatus(getUserName(),getGroupName(),getFsPermission());
+    return getPermissionStatus(null);
   }
-  private void updatePermissionStatus(PermissionStatusFormat f, long n) {
+  private void updatePermissionStatus(PermissionStatusFormat f, long n,
+      Snapshot latest) {
+    recordModification(latest);
     permission = f.combine(n, permission);
   }
-  /** Get user name */
-  public String getUserName() {
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return user name
+   */
+  public String getUserName(Snapshot snapshot) {
     int n = (int)PermissionStatusFormat.USER.retrieve(permission);
     return SerialNumberManager.INSTANCE.getUser(n);
   }
+  /** The same as getUserName(null). */
+  public String getUserName() {
+    return getUserName(null);
+  }
   /** Set user */
-  protected void setUser(String user) {
+  protected void setUser(String user, Snapshot latest) {
     int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
-    updatePermissionStatus(PermissionStatusFormat.USER, n);
+    updatePermissionStatus(PermissionStatusFormat.USER, n, latest);
   }
-  /** Get group name */
-  public String getGroupName() {
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return group name
+   */
+  public String getGroupName(Snapshot snapshot) {
     int n = (int)PermissionStatusFormat.GROUP.retrieve(permission);
     return SerialNumberManager.INSTANCE.getGroup(n);
   }
+  /** The same as getGroupName(null). */
+  public String getGroupName() {
+    return getGroupName(null);
+  }
   /** Set group */
-  protected void setGroup(String group) {
+  protected void setGroup(String group, Snapshot latest) {
     int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
-    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
+    updatePermissionStatus(PermissionStatusFormat.GROUP, n, latest);
   }
-  /** Get the {@link FsPermission} */
-  public FsPermission getFsPermission() {
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return permission.
+   */
+  public FsPermission getFsPermission(Snapshot snapshot) {
     return new FsPermission(
         (short)PermissionStatusFormat.MODE.retrieve(permission));
   }
+  /** The same as getFsPermission(null). */
+  public FsPermission getFsPermission() {
+    return getFsPermission(null);
+  }
   protected short getFsPermissionShort() {
     return (short)PermissionStatusFormat.MODE.retrieve(permission);
   }
   /** Set the {@link FsPermission} of this {@link INode} */
-  void setPermission(FsPermission permission) {
-    updatePermissionStatus(PermissionStatusFormat.MODE, permission.toShort());
+  void setPermission(FsPermission permission, Snapshot latest) {
+    final short mode = permission.toShort();
+    updatePermissionStatus(PermissionStatusFormat.MODE, mode, latest);
+  }
+
+  /**
+   * This inode is being modified.  The previous version of the inode needs to
+   * be recorded in the latest snapshot.
+   *
+   * @param latest the latest snapshot that has been taken.
+   *        Note that it is null if no snapshots have been taken.
+   * @return see {@link #createSnapshotCopy()}. 
+   */
+  Pair<? extends INode, ? extends INode> recordModification(Snapshot latest) {
+    Preconditions.checkState(!isDirectory(),
+        "this is an INodeDirectory, this=%s", this);
+    return latest == null? null: parent.saveChild2Snapshot(this, latest);
   }
 
   /**
@@ -325,13 +390,13 @@ public abstract class INode implements C
    * Set local file name
    */
   public void setLocalName(String name) {
-    this.name = DFSUtil.string2Bytes(name);
+    setLocalName(DFSUtil.string2Bytes(name));
   }
 
   /**
    * Set local file name
    */
-  void setLocalName(byte[] name) {
+  public void setLocalName(byte[] name) {
     this.name = name;
   }
 
@@ -366,7 +431,7 @@ public abstract class INode implements C
    * Get parent directory 
    * @return parent INode
    */
-  INodeDirectory getParent() {
+  public INodeDirectory getParent() {
     return this.parent;
   }
 
@@ -375,19 +440,26 @@ public abstract class INode implements C
     this.parent = parent;
   }
   
-  /** 
-   * Get last modification time of inode.
-   * @return access time
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return modification time.
    */
-  public long getModificationTime() {
+  public long getModificationTime(Snapshot snapshot) {
     return this.modificationTime;
   }
 
+  /** The same as getModificationTime(null). */
+  public long getModificationTime() {
+    return getModificationTime(null);
+  }
+
   /** Update modification time if it is larger than the current value. */
-  public void updateModificationTime(long modtime) {
+  public void updateModificationTime(long mtime, Snapshot latest) {
     assert isDirectory();
-    if (this.modificationTime <= modtime) {
-      this.modificationTime = modtime;
+    if (mtime > modificationTime) {
+      setModificationTime(mtime, latest);
     }
   }
 
@@ -398,22 +470,31 @@ public abstract class INode implements C
   /**
    * Always set the last modification time of inode.
    */
-  void setModificationTime(long modtime) {
+  public void setModificationTime(long modtime, Snapshot latest) {
+    recordModification(latest);
     this.modificationTime = modtime;
   }
 
   /**
-   * Get access time of inode.
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
    * @return access time
    */
-  public long getAccessTime() {
+  public long getAccessTime(Snapshot snapshot) {
     return accessTime;
   }
 
+  /** The same as getAccessTime(null). */
+  public long getAccessTime() {
+    return getAccessTime(null);
+  }
+
   /**
    * Set last access time of inode.
    */
-  void setAccessTime(long atime) {
+  void setAccessTime(long atime, Snapshot latest) {
+    recordModification(latest);
     accessTime = atime;
   }
 
@@ -483,16 +564,6 @@ public abstract class INode implements C
     return buf.toString();
   }
 
-  public boolean removeNode() {
-    if (parent == null) {
-      return false;
-    } else {
-      parent.removeChild(this);
-      parent = null;
-      return true;
-    }
-  }
-
   private static final byte[] EMPTY_BYTES = {};
 
   @Override
@@ -561,9 +632,9 @@ public abstract class INode implements C
    * @return a text representation of the tree.
    */
   @VisibleForTesting
-  public StringBuffer dumpTreeRecursively() {
+  public final StringBuffer dumpTreeRecursively() {
     final StringWriter out = new StringWriter(); 
-    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder());
+    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(), null);
     return out.getBuffer();
   }
 
@@ -572,14 +643,20 @@ public abstract class INode implements C
    * @param prefix The prefix string that each line should print.
    */
   @VisibleForTesting
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix, Snapshot snapshot) {
     out.print(prefix);
     out.print(" ");
     out.print(getLocalName());
     out.print("   (");
     out.print(getObjectString());
     out.print("), parent=");
-    out.println(parent == null? null: parent.getLocalName());
+    out.print(parent == null? null: parent.getLocalName() + "/");
+    if (!this.isDirectory()) {
+      out.println();
+    } else {
+      final INodeDirectory dir = (INodeDirectory)this;
+      out.println(", size=" + dir.getChildrenList(snapshot).size());
+    }
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Dec 21 01:30:49 2012
@@ -91,12 +91,6 @@ public class INodeDirectory extends INod
     }
   }
 
-  @Override
-  public Pair<INodeDirectory, INodeDirectory> createSnapshotCopy() {
-    return new Pair<INodeDirectory, INodeDirectory>(this,
-        new INodeDirectory(this, false));
-  }
-  
   /** @return true unconditionally. */
   @Override
   public final boolean isDirectory() {
@@ -118,8 +112,9 @@ public class INodeDirectory extends INod
     return Collections.binarySearch(children, name);
   }
 
-  protected int searchChildrenForExistingINode(byte[] name) {
+  protected int searchChildrenForExistingINode(final INode inode) {
     assertChildrenNonNull();
+    final byte[] name = inode.getLocalNameBytes();
     final int i = searchChildren(name);
     if (i < 0) {
       throw new AssertionError("Child not found: name="
@@ -138,20 +133,87 @@ public class INodeDirectory extends INod
     return i >= 0? children.remove(i): null;
   }
 
-  /** Replace a child that has the same name as newChild by newChild.
+  /**
+   * Remove the specified child from this directory.
    * 
-   * @param newChild Child node to be added
+   * @param child the child inode to be removed
+   * @param latest See {@link INode#recordModification(Snapshot)}.
+   * @return the removed child inode.
    */
-  void replaceChild(INode newChild) {
+  public INode removeChild(INode child, Snapshot latest) {
     assertChildrenNonNull();
 
-    final int low = searchChildren(newChild.getLocalNameBytes());
-    if (low>=0) { // an old child exists so replace by the newChild
-      children.get(low).parent = null;
-      children.set(low, newChild);
-    } else {
-      throw new IllegalArgumentException("No child exists to be replaced");
+    if (latest != null) {
+      final INodeDirectoryWithSnapshot dir = replaceSelf4INodeDirectoryWithSnapshot(latest);
+      return dir.removeChild(child, latest);
     }
+
+    final int i = searchChildren(child.getLocalNameBytes());
+    return i >= 0? children.remove(i): null;
+  }
+
+  /**
+   * Replace itself with {@link INodeDirectoryWithQuota} or
+   * {@link INodeDirectoryWithSnapshot} depending on the latest snapshot.
+   */
+  INodeDirectoryWithQuota replaceSelf4Quota(final Snapshot latest,
+      final long nsQuota, final long dsQuota) {
+    Preconditions.checkState(!(this instanceof INodeDirectoryWithQuota),
+        "this is already an INodeDirectoryWithQuota, this=%s", this);
+
+    if (latest == null) {
+      final INodeDirectoryWithQuota q = new INodeDirectoryWithQuota(
+          this, true, nsQuota, dsQuota);
+      replaceSelf(q);
+      return q;
+    } else {
+      final INodeDirectoryWithSnapshot s
+          = INodeDirectoryWithSnapshot.newInstance(this, null);
+      s.setQuota(nsQuota, dsQuota, null);
+      replaceSelf(s);
+      s.save2Snapshot(latest, this);
+      return s;
+    }
+  }
+  /** Replace itself with an {@link INodeDirectorySnapshottable}. */
+  public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
+      Snapshot latest) {
+    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
+    replaceSelf(s);
+    s.save2Snapshot(latest, this);
+    return s;
+  }
+
+  /** Replace itself with an {@link INodeDirectoryWithSnapshot}. */
+  public INodeDirectoryWithSnapshot replaceSelf4INodeDirectoryWithSnapshot(
+      Snapshot latest) {
+    Preconditions.checkState(!(this instanceof INodeDirectoryWithSnapshot),
+        "this is already an INodeDirectoryWithSnapshot, this=%s", this);
+
+    final INodeDirectoryWithSnapshot withSnapshot
+        = INodeDirectoryWithSnapshot.newInstance(this, latest);
+    replaceSelf(withSnapshot);
+    return withSnapshot;
+  }
+
+  /** Replace itself with {@link INodeDirectory}. */
+  public INodeDirectory replaceSelf4INodeDirectory() {
+    Preconditions.checkState(getClass() != INodeDirectory.class,
+        "the class is already INodeDirectory, this=%s", this);
+
+    final INodeDirectory newNode = new INodeDirectory(this, true);
+    replaceSelf(newNode);
+    return newNode;
+  }
+
+  /** Replace itself with the given directory. */
+  private final void replaceSelf(INodeDirectory newDir) {
+    final INodeDirectory parent = getParent();
+    Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
+
+    final int i = parent.searchChildrenForExistingINode(newDir);
+    final INode oldDir = parent.children.set(i, newDir);
+    oldDir.setParent(null);
   }
 
   /** Replace a child {@link INodeFile} with an {@link INodeFileWithLink}. */
@@ -161,12 +223,44 @@ public class INodeDirectory extends INod
         "Child file is already an INodeFileWithLink, child=" + child);
 
     final INodeFileWithLink newChild = new INodeFileWithLink(child);
-    final int i = searchChildrenForExistingINode(newChild.getLocalNameBytes());
+    final int i = searchChildrenForExistingINode(newChild);
     children.set(i, newChild);
     return newChild;
   }
 
-  private INode getChild(byte[] name, Snapshot snapshot) {
+  @Override
+  public Pair<? extends INode, ? extends INode> recordModification(Snapshot latest) {
+    if (latest == null) {
+      return null;
+    }
+    return replaceSelf4INodeDirectoryWithSnapshot(latest)
+        .save2Snapshot(latest, this);
+  }
+
+  /**
+   * Save the child to the latest snapshot.
+   * 
+   * @return a pair of inodes, where the left inode is the original child and
+   *         the right inode is the snapshot copy of the child; see also
+   *         {@link INode#createSnapshotCopy()}.
+   */
+  public Pair<? extends INode, ? extends INode> saveChild2Snapshot(
+      INode child, Snapshot latest) {
+    if (latest == null) {
+      return null;
+    }
+    return replaceSelf4INodeDirectoryWithSnapshot(latest)
+        .saveChild2Snapshot(child, latest);
+  }
+
+  /**
+   * @param name the name of the child
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current directory.
+   * @return the child inode.
+   */
+  public INode getChild(byte[] name, Snapshot snapshot) {
     final ReadOnlyList<INode> c = getChildrenList(snapshot);
     final int i = ReadOnlyList.Util.binarySearch(c, name);
     return i < 0? null: c.get(i);
@@ -307,10 +401,11 @@ public class INodeDirectory extends INod
       if (lastComp || !curNode.isDirectory()) {
         break;
       }
-      INodeDirectory parentDir = (INodeDirectory)curNode;
+      final INodeDirectory parentDir = (INodeDirectory)curNode;
+      final byte[] childName = components[count + 1];
       
       // check if the next byte[] in components is for ".snapshot"
-      if (isDotSnapshotDir(components[count + 1])
+      if (isDotSnapshotDir(childName)
           && (curNode instanceof INodeDirectorySnapshottable)) {
         // skip the ".snapshot" in components
         count++;
@@ -321,7 +416,7 @@ public class INodeDirectory extends INod
         }
         // check if ".snapshot" is the last element of components
         if (count == components.length - 1) {
-          return existing;
+          break;
         }
         // Resolve snapshot root
         final Snapshot s = ((INodeDirectorySnapshottable)parentDir).getSnapshot(
@@ -338,8 +433,7 @@ public class INodeDirectory extends INod
         }
       } else {
         // normal case, and also for resolving file/dir under snapshot root
-        curNode = parentDir.getChild(components[count + 1],
-            existing.getPathSnapshot());
+        curNode = parentDir.getChild(childName, existing.getPathSnapshot());
       }
       count++;
       index++;
@@ -382,11 +476,11 @@ public class INodeDirectory extends INod
    * @param name a child's name
    * @return the index of the next child
    */
-  int nextChild(byte[] name) {
+  static int nextChild(ReadOnlyList<INode> children, byte[] name) {
     if (name.length == 0) { // empty name
       return 0;
     }
-    int nextPos = Collections.binarySearch(children, name) + 1;
+    int nextPos = ReadOnlyList.Util.binarySearch(children, name) + 1;
     if (nextPos >= 0) {
       return nextPos;
     }
@@ -403,7 +497,13 @@ public class INodeDirectory extends INod
    * @return false if the child with this name already exists; 
    *         otherwise, return true;
    */
-  public boolean addChild(final INode node, final boolean setModTime) {
+  public boolean addChild(final INode node, final boolean setModTime,
+      final Snapshot latest) {
+    if (latest != null) {
+      final INodeDirectoryWithSnapshot dir = replaceSelf4INodeDirectoryWithSnapshot(latest);
+      return dir.addChild(node, setModTime, latest);
+    }
+
     if (children == null) {
       children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
     }
@@ -414,10 +514,11 @@ public class INodeDirectory extends INod
     node.parent = this;
     children.add(-low - 1, node);
     // update modification time of the parent directory
-    if (setModTime)
-      updateModificationTime(node.getModificationTime());
+    if (setModTime) {
+      updateModificationTime(node.getModificationTime(), latest);
+    }
     if (node.getGroupName() == null) {
-      node.setGroup(getGroupName());
+      node.setGroup(getGroupName(), latest);
     }
     return true;
   }
@@ -445,7 +546,7 @@ public class INodeDirectory extends INod
     final INodesInPath iip =  getExistingPathINodes(pathComponents, 2, false);
     final INodeDirectory parent = INodeDirectory.valueOf(iip.getINode(0),
         pathComponents);
-    return parent.addChild(newNode, true);
+    return parent.addChild(newNode, true, iip.getLatestSnapshot());
   }
 
   @Override
@@ -502,6 +603,7 @@ public class INodeDirectory extends INod
     return children == null ? EMPTY_READ_ONLY_LIST
         : ReadOnlyList.Util.asReadOnlyList(children);
   }
+
   /** Set the children list. */
   public void setChildren(List<INode> children) {
     this.children = children;
@@ -526,7 +628,7 @@ public class INodeDirectory extends INod
    * {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}.
    * Contains INodes information resolved from a given path.
    */
-  static class INodesInPath {
+  public static class INodesInPath {
     private final byte[][] path;
     /**
      * Array with the specified number of INodes resolved for a given path.
@@ -727,13 +829,37 @@ public class INodeDirectory extends INod
   static final String DUMPTREE_LAST_ITEM = "\\-";
   @VisibleForTesting
   @Override
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
-    super.dumpTreeRecursively(out, prefix);
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
+      final Snapshot snapshot) {
+    super.dumpTreeRecursively(out, prefix, snapshot);
     if (prefix.length() >= 2) {
       prefix.setLength(prefix.length() - 2);
       prefix.append("  ");
     }
-    dumpTreeRecursively(out, prefix, children);
+    dumpTreeRecursively(out, prefix,
+        new Iterable<Pair<? extends INode, Snapshot>>() {
+      final Iterator<INode> i = getChildrenList(snapshot).iterator();
+      
+      @Override
+      public Iterator<Pair<? extends INode, Snapshot>> iterator() {
+        return new Iterator<Pair<? extends INode, Snapshot>>() {
+          @Override
+          public boolean hasNext() {
+            return i.hasNext();
+          }
+
+          @Override
+          public Pair<INode, Snapshot> next() {
+            return new Pair<INode, Snapshot>(i.next(), snapshot);
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    });
   }
 
   /**
@@ -743,12 +869,12 @@ public class INodeDirectory extends INod
    */
   @VisibleForTesting
   protected static void dumpTreeRecursively(PrintWriter out,
-      StringBuilder prefix, Iterable<? extends INode> subs) {
+      StringBuilder prefix, Iterable<Pair<? extends INode, Snapshot>> subs) {
     if (subs != null) {
-      for(final Iterator<? extends INode> i = subs.iterator(); i.hasNext();) {
-        final INode inode = i.next();
+      for(final Iterator<Pair<? extends INode, Snapshot>> i = subs.iterator(); i.hasNext();) {
+        final Pair<? extends INode, Snapshot> pair = i.next();
         prefix.append(i.hasNext()? DUMPTREE_EXCEPT_LAST_ITEM: DUMPTREE_LAST_ITEM);
-        inode.dumpTreeRecursively(out, prefix);
+        pair.left.dumpTreeRecursively(out, prefix, pair.right);
         prefix.setLength(prefix.length() - 2);
       }
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Fri Dec 21 01:30:49 2012
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 /**
  * Directory INode class that has a quota restriction
@@ -86,11 +87,11 @@ public class INodeDirectoryWithQuota ext
    * 
    * @param nsQuota Namespace quota to be set
    * @param dsQuota diskspace quota to be set
-   *                                
    */
-  void setQuota(long newNsQuota, long newDsQuota) {
-    nsQuota = newNsQuota;
-    dsQuota = newDsQuota;
+  public void setQuota(long nsQuota, long dsQuota, Snapshot latest) {
+    recordModification(latest);
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Dec 21 01:30:49 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 /** I-node for closed file. */
 @InterfaceAudience.Private
@@ -126,8 +127,8 @@ public class INodeFile extends INode imp
    * the {@link FsAction#EXECUTE} action, if any, is ignored.
    */
   @Override
-  void setPermission(FsPermission permission) {
-    super.setPermission(permission.applyUMask(UMASK));
+  void setPermission(FsPermission permission, Snapshot latest) {
+    super.setPermission(permission.applyUMask(UMASK), latest);
   }
 
   /** @return the replication factor of the file. */
@@ -140,7 +141,15 @@ public class INodeFile extends INode imp
     return getFileReplication();
   }
 
-  protected void setFileReplication(short replication) {
+  protected void setFileReplication(short replication, Snapshot latest) {
+    if (latest != null) {
+      final Pair<? extends INode, ? extends INode> p = recordModification(latest);
+      if (p != null) {
+        ((INodeFile)p.left).setFileReplication(replication, null);
+        return;
+      }
+    }
+
     header = HeaderFormat.combineReplication(header, replication);
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Fri Dec 21 01:30:49 2012
@@ -34,7 +34,7 @@ public class INodeSymlink extends INode 
     this.symlink = DFSUtil.string2Bytes(value);
   }
   
-  public INodeSymlink(INodeSymlink that) {
+  INodeSymlink(INodeSymlink that) {
     super(that);
 
     //copy symlink

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1424782&r1=1424781&r2=1424782&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Fri Dec 21 01:30:49 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Directories where taking snapshots is allowed.
@@ -41,10 +42,8 @@ import com.google.common.annotations.Vis
  */
 @InterfaceAudience.Private
 public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
-  static public INodeDirectorySnapshottable newInstance(
-      final INodeDirectory dir, final int snapshotQuota) {
-    return new INodeDirectorySnapshottable(dir, snapshotQuota);
-  }
+  /** Limit the number of snapshot per snapshottable directory. */
+  static final int SNAPSHOT_LIMIT = 1 << 16;
 
   /** Cast INode to INodeDirectorySnapshottable. */
   static public INodeDirectorySnapshottable valueOf(
@@ -57,18 +56,12 @@ public class INodeDirectorySnapshottable
     return (INodeDirectorySnapshottable)dir;
   }
 
-  /** Snapshots of this directory in ascending order of snapshot id. */
-  private final List<Snapshot> snapshots = new ArrayList<Snapshot>();
-  /** Snapshots of this directory in ascending order of snapshot names. */
-  private final List<Snapshot> snapshotsByNames = new ArrayList<Snapshot>();
-  
   /**
-   * @return {@link #snapshots}
+   * Snapshots of this directory in ascending order of snapshot names.
+   * Note that snapshots in ascending order of snapshot id are stored in
+   * {@link INodeDirectoryWithSnapshot}.diffs (a private field).
    */
-  @VisibleForTesting
-  List<Snapshot> getSnapshots() {
-    return snapshots;
-  }
+  private final List<Snapshot> snapshotsByNames = new ArrayList<Snapshot>();
 
   /**
    * @return {@link #snapshotsByNames}
@@ -79,16 +72,15 @@ public class INodeDirectorySnapshottable
   }
   
   /** Number of snapshots allowed. */
-  private int snapshotQuota;
+  private int snapshotQuota = SNAPSHOT_LIMIT;
 
-  private INodeDirectorySnapshottable(INodeDirectory dir,
-      final int snapshotQuota) {
-    super(dir, true);
-    setSnapshotQuota(snapshotQuota);
+  public INodeDirectorySnapshottable(INodeDirectory dir) {
+    super(dir, true, null);
   }
   
+  /** @return the number of existing snapshots. */
   public int getNumSnapshots() {
-    return snapshots.size();
+    return getSnapshotsByNames().size();
   }
   
   private int searchSnapshot(byte[] snapshotName) {
@@ -132,7 +124,7 @@ public class INodeDirectorySnapshottable
       }
       // remove the one with old name from snapshotsByNames
       Snapshot snapshot = snapshotsByNames.remove(indexOfOld);
-      INodeDirectoryWithSnapshot ssRoot = snapshot.getRoot();
+      final INodeDirectory ssRoot = snapshot.getRoot();
       ssRoot.setLocalName(newName);
       indexOfNew = -indexOfNew - 1;
       if (indexOfNew <= indexOfOld) {
@@ -143,12 +135,6 @@ public class INodeDirectorySnapshottable
     }
   }
 
-  /** @return the last snapshot. */
-  public Snapshot getLastSnapshot() {
-    final int n = snapshots.size();
-    return n == 0? null: snapshots.get(n - 1);
-  }
-
   public int getSnapshotQuota() {
     return snapshotQuota;
   }
@@ -169,9 +155,10 @@ public class INodeDirectorySnapshottable
   /** Add a snapshot. */
   Snapshot addSnapshot(int id, String name) throws SnapshotException {
     //check snapshot quota
-    if (snapshots.size() + 1 > snapshotQuota) {
+    final int n = getNumSnapshots();
+    if (n + 1 > snapshotQuota) {
       throw new SnapshotException("Failed to add snapshot: there are already "
-          + snapshots.size() + " snapshot(s) and the snapshot quota is "
+          + n + " snapshot(s) and the snapshot quota is "
           + snapshotQuota);
     }
     final Snapshot s = new Snapshot(id, name, this);
@@ -182,47 +169,91 @@ public class INodeDirectorySnapshottable
           + "snapshot with the same name \"" + name + "\".");
     }
 
-    snapshots.add(s);
+    addSnapshotDiff(s, this, true);
     snapshotsByNames.add(-i - 1, s);
 
     //set modification time
     final long timestamp = Time.now();
-    s.getRoot().updateModificationTime(timestamp);
-    updateModificationTime(timestamp);
+    s.getRoot().updateModificationTime(timestamp, null);
+    updateModificationTime(timestamp, null);
     return s;
   }
-  
-  @Override
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
-    super.dumpTreeRecursively(out, prefix);
 
-    out.print(prefix);
-    out.print(snapshots.size());
-    out.print(snapshots.size() <= 1 ? " snapshot of " : " snapshots of ");
-    out.println(getLocalName());
-
-    dumpTreeRecursively(out, prefix, new Iterable<INodeDirectoryWithSnapshot>() {
-      @Override
-      public Iterator<INodeDirectoryWithSnapshot> iterator() {
-        return new Iterator<INodeDirectoryWithSnapshot>() {
-          final Iterator<Snapshot> i = snapshots.iterator();
-
-          @Override
-          public boolean hasNext() {
-            return i.hasNext();
-          }
-
-          @Override
-          public INodeDirectoryWithSnapshot next() {
-            return i.next().getRoot();
-          }
-
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-        };
+  /**
+   * Replace itself with {@link INodeDirectoryWithSnapshot} or
+   * {@link INodeDirectory} depending on the latest snapshot.
+   */
+  void replaceSelf(final Snapshot latest) {
+    if (latest == null) {
+      Preconditions.checkState(getLastSnapshot() == null,
+          "latest == null but getLastSnapshot() != null, this=%s", this);
+      replaceSelf4INodeDirectory();
+    } else {
+      replaceSelf4INodeDirectoryWithSnapshot(latest).recordModification(latest);
+    }
+  }
+
+  @Override
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
+      Snapshot snapshot) {
+    super.dumpTreeRecursively(out, prefix, snapshot);
+
+    try {
+    if (snapshot == null) {
+      out.println();
+      out.print(prefix);
+      int n = 0;
+      for(SnapshotDiff diff : getSnapshotDiffs()) {
+        if (diff.isSnapshotRoot()) {
+          n++;
+        }
       }
-    });
+      out.print(n);
+      out.print(n <= 1 ? " snapshot of " : " snapshots of ");
+      final String name = getLocalName();
+      out.println(name.isEmpty()? "/": name);
+
+      dumpTreeRecursively(out, prefix, new Iterable<Pair<? extends INode, Snapshot>>() {
+        @Override
+        public Iterator<Pair<? extends INode, Snapshot>> iterator() {
+          return new Iterator<Pair<? extends INode, Snapshot>>() {
+            final Iterator<SnapshotDiff> i = getSnapshotDiffs().iterator();
+            private SnapshotDiff next = findNext();
+  
+            private SnapshotDiff findNext() {
+              for(; i.hasNext(); ) {
+                final SnapshotDiff diff = i.next();
+                if (diff.isSnapshotRoot()) {
+                  return diff;
+                }
+              }
+              return null;
+            }
+
+            @Override
+            public boolean hasNext() {
+              return next != null;
+            }
+  
+            @Override
+            public Pair<INodeDirectory, Snapshot> next() {
+              final Snapshot s = next.snapshot;
+              final Pair<INodeDirectory, Snapshot> pair =
+                  new Pair<INodeDirectory, Snapshot>(s.getRoot(), s);
+              next = findNext();
+              return pair;
+            }
+  
+            @Override
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      });
+    }
+    } catch(Exception e) {
+      throw new RuntimeException("this=" + this, e);
+    }
   }
 }



Mime
View raw message