hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject svn commit: r1608631 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ src/test/java/org/apache/hadoop/hdfs/server/na...
Date Tue, 08 Jul 2014 00:08:20 GMT
Author: wheat9
Date: Tue Jul  8 00:08:18 2014
New Revision: 1608631

URL: http://svn.apache.org/r1608631
Log:
HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable directory. Contributed by Jing Zhao.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotRename.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jul  8 00:08:18 2014
@@ -129,6 +129,9 @@ Trunk (Unreleased)
 
     HDFS-6252. Phase out the old web UI in HDFS. (wheat9)
 
+    HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
+    directory. (Jing Zhao via wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Jul  8 00:08:18 2014
@@ -71,7 +71,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ByteArray;
@@ -91,7 +91,7 @@ import com.google.common.collect.Lists;
  **/
 @InterfaceAudience.Private
 public class FSDirectory implements Closeable {
-  private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
+  private static INodeDirectory createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
         INodeDirectory.ROOT_NAME,
@@ -100,9 +100,9 @@ public class FSDirectory implements Clos
     r.addDirectoryWithQuotaFeature(
         DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
         DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
-    s.setSnapshotQuota(0);
-    return s;
+    r.addSnapshottableFeature();
+    r.setSnapshotQuota(0);
+    return r;
   }
 
   @VisibleForTesting
@@ -585,8 +585,7 @@ public class FSDirectory implements Clos
     }
 
     final INode dstInode = dstIIP.getLastINode();
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
+    List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
     if (dstInode != null) { // Destination exists
       validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
       checkSnapshot(dstInode, snapshottableDirs);
@@ -1112,8 +1111,7 @@ public class FSDirectory implements Clos
       if (!deleteAllowed(inodesInPath, src) ) {
         filesRemoved = -1;
       } else {
-        List<INodeDirectorySnapshottable> snapshottableDirs = 
-            new ArrayList<INodeDirectorySnapshottable>();
+        List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
         checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
             removedINodes, mtime);
@@ -1183,8 +1181,7 @@ public class FSDirectory implements Clos
         normalizePath(src), false);
     long filesRemoved = -1;
     if (deleteAllowed(inodesInPath, src)) {
-      List<INodeDirectorySnapshottable> snapshottableDirs = 
-          new ArrayList<INodeDirectorySnapshottable>();
+      List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
       checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
       filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
           removedINodes, mtime);
@@ -1260,19 +1257,20 @@ public class FSDirectory implements Clos
    *                          but do not have snapshots yet
    */
   private static void checkSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) throws SnapshotException {
+      List<INodeDirectory> snapshottableDirs) throws SnapshotException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
-      if (targetDir.isSnapshottable()) {
-        INodeDirectorySnapshottable ssTargetDir = 
-            (INodeDirectorySnapshottable) targetDir;
-        if (ssTargetDir.getNumSnapshots() > 0) {
-          throw new SnapshotException("The directory " + ssTargetDir.getFullPathName()
-              + " cannot be deleted since " + ssTargetDir.getFullPathName()
+      DirectorySnapshottableFeature sf = targetDir
+          .getDirectorySnapshottableFeature();
+      if (sf != null) {
+        if (sf.getNumSnapshots() > 0) {
+          String fullPath = targetDir.getFullPathName();
+          throw new SnapshotException("The directory " + fullPath
+              + " cannot be deleted since " + fullPath
               + " is snapshottable and already has snapshots");
         } else {
           if (snapshottableDirs != null) {
-            snapshottableDirs.add(ssTargetDir);
+            snapshottableDirs.add(targetDir);
           }
         }
       } 
@@ -1359,14 +1357,18 @@ public class FSDirectory implements Clos
     Preconditions.checkArgument(
         src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-    
+
     final String dirPath = normalizePath(src.substring(0,
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    final INodeDirectorySnapshottable dirNode = INodeDirectorySnapshottable
-        .valueOf(node, dirPath);
-    final ReadOnlyList<Snapshot> snapshots = dirNode.getSnapshotList();
+    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
+    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
+    if (sf == null) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + dirPath);
+    }
+    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
     skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
     int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
@@ -1428,9 +1430,8 @@ public class FSDirectory implements Clos
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    if (node != null
-        && node.isDirectory()
-        && node.asDirectory() instanceof INodeDirectorySnapshottable) {
+    if (node != null && node.isDirectory()
+        && node.asDirectory().isSnapshottable()) {
       return node;
     }
     return null;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Jul  8 00:08:18 2014
@@ -58,7 +58,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
@@ -73,8 +72,8 @@ import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Contains inner classes for reading or writing the on-disk format for
@@ -554,21 +553,17 @@ public class FSImageFormat {
       if (!toLoadSubtree) {
         return;
       }
-      
+
       // Step 2. Load snapshots if parent is snapshottable
       int numSnapshots = in.readInt();
       if (numSnapshots >= 0) {
-        final INodeDirectorySnapshottable snapshottableParent
-            = INodeDirectorySnapshottable.valueOf(parent, parent.getLocalName());
         // load snapshots and snapshotQuota
-        SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
-            numSnapshots, in, this);
-        if (snapshottableParent.getSnapshotQuota() > 0) {
+        SnapshotFSImageFormat.loadSnapshotList(parent, numSnapshots, in, this);
+        if (parent.getDirectorySnapshottableFeature().getSnapshotQuota() > 0) {
           // add the directory to the snapshottable directory list in 
           // SnapshotManager. Note that we only add root when its snapshot quota
           // is positive.
-          this.namesystem.getSnapshotManager().addSnapshottable(
-              snapshottableParent);
+          this.namesystem.getSnapshotManager().addSnapshottable(parent);
         }
       }
 
@@ -820,7 +815,10 @@ public class FSImageFormat {
       if (withSnapshot) {
         dir.addSnapshotFeature(null);
       }
-      return snapshottable ? new INodeDirectorySnapshottable(dir) : dir;
+      if (snapshottable) {
+        dir.addSnapshottableFeature();
+      }
+      return dir;
     } else if (numBlocks == -2) {
       //symlink
 
@@ -1367,10 +1365,8 @@ public class FSImageFormat {
 
       // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
       // Snapshots
-      if (current instanceof INodeDirectorySnapshottable) {
-        INodeDirectorySnapshottable snapshottableNode =
-            (INodeDirectorySnapshottable) current;
-        SnapshotFSImageFormat.saveSnapshots(snapshottableNode, out);
+      if (current.isDirectory() && current.asDirectory().isSnapshottable()) {
+        SnapshotFSImageFormat.saveSnapshots(current.asDirectory(), out);
       } else {
         out.writeInt(-1); // # of snapshots
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Tue Jul  8 00:08:18 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.hdfs.util.XMLUtils;
@@ -241,7 +240,7 @@ public class FSImageSerialization {
 
     writeQuota(node.getQuotaCounts(), out);
 
-    if (node instanceof INodeDirectorySnapshottable) {
+    if (node.isSnapshottable()) {
       out.writeBoolean(true);
     } else {
       out.writeBoolean(false);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jul  8 00:08:18 2014
@@ -217,7 +217,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -7580,7 +7579,7 @@ public class FSNamesystem implements Nam
    * Remove a list of INodeDirectorySnapshottable from the SnapshotManager
    * @param toRemove the list of INodeDirectorySnapshottable to be removed
    */
-  void removeSnapshottableDirs(List<INodeDirectorySnapshottable> toRemove) {
+  void removeSnapshottableDirs(List<INodeDirectory> toRemove) {
     if (snapshotManager != null) {
       snapshotManager.removeSnapshottable(toRemove);
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Tue Jul  8 00:08:18 2014
@@ -29,10 +29,11 @@ import org.apache.hadoop.fs.PathIsNotDir
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -102,11 +103,6 @@ public class INodeDirectory extends INod
     return this;
   }
 
-  /** Is this a snapshottable directory? */
-  public boolean isSnapshottable() {
-    return false;
-  }
-
   void setQuota(long nsQuota, long dsQuota) {
     DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
     if (quota != null) {
@@ -186,7 +182,7 @@ public class INodeDirectory extends INod
   public final boolean isWithSnapshot() {
     return getDirectoryWithSnapshotFeature() != null;
   }
-  
+
   public DirectoryDiffList getDiffs() {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     return sf != null ? sf.getDiffs() : null;
@@ -204,50 +200,71 @@ public class INodeDirectory extends INod
     return super.toDetailString() + (sf == null ? "" : ", " + sf.getDiffs()); 
   }
 
-  /** Replace itself with an {@link INodeDirectorySnapshottable}. */
-  public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
-      int latestSnapshotId, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    Preconditions.checkState(!(this instanceof INodeDirectorySnapshottable),
-        "this is already an INodeDirectorySnapshottable, this=%s", this);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
-    replaceSelf(s, inodeMap).getDirectoryWithSnapshotFeature().getDiffs()
-        .saveSelf2Snapshot(latestSnapshotId, s, this);
-    return s;
-  }
-
-  /** Replace itself with {@link INodeDirectory}. */
-  public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
-    Preconditions.checkState(getClass() != INodeDirectory.class,
-        "the class is already INodeDirectory, this=%s", this);
-    return replaceSelf(new INodeDirectory(this, true, this.getFeatures()),
-      inodeMap);
+  public DirectorySnapshottableFeature getDirectorySnapshottableFeature() {
+    return getFeature(DirectorySnapshottableFeature.class);
   }
 
-  /** Replace itself with the given directory. */
-  private final <N extends INodeDirectory> N replaceSelf(final N newDir,
-      final INodeMap inodeMap) {
-    final INodeReference ref = getParentReference();
-    if (ref != null) {
-      ref.setReferredINode(newDir);
-      if (inodeMap != null) {
-        inodeMap.put(newDir);
-      }
-    } else {
-      final INodeDirectory parent = getParent();
-      Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
-      parent.replaceChild(this, newDir, inodeMap);
+  public boolean isSnapshottable() {
+    return getDirectorySnapshottableFeature() != null;
+  }
+
+  public Snapshot getSnapshot(byte[] snapshotName) {
+    return getDirectorySnapshottableFeature().getSnapshot(snapshotName);
+  }
+
+  public void setSnapshotQuota(int snapshotQuota) {
+    getDirectorySnapshottableFeature().setSnapshotQuota(snapshotQuota);
+  }
+
+  public Snapshot addSnapshot(int id, String name) throws SnapshotException,
+      QuotaExceededException {
+    return getDirectorySnapshottableFeature().addSnapshot(this, id, name);
+  }
+
+  public Snapshot removeSnapshot(String snapshotName,
+      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
+      throws SnapshotException {
+    return getDirectorySnapshottableFeature().removeSnapshot(this,
+        snapshotName, collectedBlocks, removedINodes);
+  }
+
+  public void renameSnapshot(String path, String oldName, String newName)
+      throws SnapshotException {
+    getDirectorySnapshottableFeature().renameSnapshot(path, oldName, newName);
+  }
+
+  /** add DirectorySnapshottableFeature */
+  public void addSnapshottableFeature() {
+    Preconditions.checkState(!isSnapshottable(),
+        "this is already snapshottable, this=%s", this);
+    DirectoryWithSnapshotFeature s = this.getDirectoryWithSnapshotFeature();
+    final DirectorySnapshottableFeature snapshottable =
+        new DirectorySnapshottableFeature(s);
+    if (s != null) {
+      this.removeFeature(s);
     }
-    clear();
-    return newDir;
+    this.addFeature(snapshottable);
   }
-  
+
+  /** remove DirectorySnapshottableFeature */
+  public void removeSnapshottableFeature() {
+    DirectorySnapshottableFeature s = getDirectorySnapshottableFeature();
+    Preconditions.checkState(s != null,
+        "The dir does not have snapshottable feature: this=%s", this);
+    this.removeFeature(s);
+    if (s.getDiffs().asList().size() > 0) {
+      // add a DirectoryWithSnapshotFeature back
+      DirectoryWithSnapshotFeature sf = new DirectoryWithSnapshotFeature(
+          s.getDiffs());
+      addFeature(sf);
+    }
+  }
+
   /** 
    * Replace the given child with a new child. Note that we no longer need to
    * replace an normal INodeDirectory or INodeFile into an
    * INodeDirectoryWithSnapshot or INodeFileUnderConstruction. The only cases
-   * for child replacement is for {@link INodeDirectorySnapshottable} and 
-   * reference nodes.
+   * for child replacement is for reference nodes.
    */
   public void replaceChild(INode oldChild, final INode newChild,
       final INodeMap inodeMap) {
@@ -822,6 +839,11 @@ public class INodeDirectory extends INod
         };
       }
     });
+
+    final DirectorySnapshottableFeature s = getDirectorySnapshottableFeature();
+    if (s != null) {
+      s.dumpTreeRecursively(this, out, prefix, snapshot);
+    }
   }
 
   /**
@@ -830,7 +852,7 @@ public class INodeDirectory extends INod
    * @param subs The subtrees.
    */
   @VisibleForTesting
-  protected static void dumpTreeRecursively(PrintWriter out,
+  public static void dumpTreeRecursively(PrintWriter out,
       StringBuilder prefix, Iterable<SnapshotAndINode> subs) {
     if (subs != null) {
       for(final Iterator<SnapshotAndINode> i = subs.iterator(); i.hasNext();) {
@@ -843,7 +865,7 @@ public class INodeDirectory extends INod
   }
 
   /** A pair of Snapshot and INode objects. */
-  protected static class SnapshotAndINode {
+  public static class SnapshotAndINode {
     public final int snapshotId;
     public final INode inode;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java Tue Jul  8 00:08:18 2014
@@ -318,8 +318,9 @@ public abstract class INodeWithAdditiona
   }
 
   protected <T extends Feature> T getFeature(Class<? extends Feature> clazz) {
+    Preconditions.checkArgument(clazz != null);
     for (Feature f : features) {
-      if (f.getClass() == clazz) {
+      if (clazz.isAssignableFrom(f.getClass())) {
         @SuppressWarnings("unchecked")
         T ret = (T) f;
         return ret;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java Tue Jul  8 00:08:18 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
@@ -208,8 +207,7 @@ public class INodesInPath {
       final byte[] childName = components[count + 1];
       
       // check if the next byte[] in components is for ".snapshot"
-      if (isDotSnapshotDir(childName)
-          && isDir && dir instanceof INodeDirectorySnapshottable) {
+      if (isDotSnapshotDir(childName) && isDir && dir.isSnapshottable()) {
         // skip the ".snapshot" in components
         count++;
         index++;
@@ -222,8 +220,7 @@ public class INodesInPath {
           break;
         }
         // Resolve snapshot root
-        final Snapshot s = ((INodeDirectorySnapshottable)dir).getSnapshot(
-            components[count + 1]);
+        final Snapshot s = dir.getSnapshot(components[count + 1]);
         if (s == null) {
           //snapshot not found
           curNode = null;

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java?rev=1608631&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java Tue Jul  8 00:08:18 2014
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.SnapshotAndINode;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
+import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A directory with this feature is a snapshottable directory, where snapshots
+ * can be taken. This feature extends {@link DirectoryWithSnapshotFeature}, and
+ * maintains extra information about all the snapshots taken on this directory.
+ */
+@InterfaceAudience.Private
+public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature {
+  /** Limit the number of snapshot per snapshottable directory. */
+  static final int SNAPSHOT_LIMIT = 1 << 16;
+
+  /**
+   * Snapshots of this directory in ascending order of snapshot names.
+   * Note that snapshots in ascending order of snapshot id are stored in
+   * {@link INodeDirectoryWithSnapshot}.diffs (a private field).
+   */
+  private final List<Snapshot> snapshotsByNames = new ArrayList<Snapshot>();
+  /** Number of snapshots allowed. */
+  private int snapshotQuota = SNAPSHOT_LIMIT;
+
+  public DirectorySnapshottableFeature(DirectoryWithSnapshotFeature feature) {
+    super(feature == null ? null : feature.getDiffs());
+  }
+
+  /** @return the number of existing snapshots. */
+  public int getNumSnapshots() {
+    return snapshotsByNames.size();
+  }
+
+  private int searchSnapshot(byte[] snapshotName) {
+    return Collections.binarySearch(snapshotsByNames, snapshotName);
+  }
+
+  /** @return the snapshot with the given name. */
+  public Snapshot getSnapshot(byte[] snapshotName) {
+    final int i = searchSnapshot(snapshotName);
+    return i < 0? null: snapshotsByNames.get(i);
+  }
+
+  public Snapshot getSnapshotById(int sid) {
+    for (Snapshot s : snapshotsByNames) {
+      if (s.getId() == sid) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  /** @return {@link #snapshotsByNames} as a {@link ReadOnlyList} */
+  public ReadOnlyList<Snapshot> getSnapshotList() {
+    return ReadOnlyList.Util.asReadOnlyList(snapshotsByNames);
+  }
+
+  /**
+   * Rename a snapshot
+   * @param path
+   *          The directory path where the snapshot was taken. Used for
+   *          generating exception message.
+   * @param oldName
+   *          Old name of the snapshot
+   * @param newName
+   *          New name the snapshot will be renamed to
+   * @throws SnapshotException
+   *           Throw SnapshotException when either the snapshot with the old
+   *           name does not exist or a snapshot with the new name already
+   *           exists
+   */
+  public void renameSnapshot(String path, String oldName, String newName)
+      throws SnapshotException {
+    if (newName.equals(oldName)) {
+      return;
+    }
+    final int indexOfOld = searchSnapshot(DFSUtil.string2Bytes(oldName));
+    if (indexOfOld < 0) {
+      throw new SnapshotException("The snapshot " + oldName
+          + " does not exist for directory " + path);
+    } else {
+      final byte[] newNameBytes = DFSUtil.string2Bytes(newName);
+      int indexOfNew = searchSnapshot(newNameBytes);
+      if (indexOfNew >= 0) {
+        throw new SnapshotException("The snapshot " + newName
+            + " already exists for directory " + path);
+      }
+      // remove the one with old name from snapshotsByNames
+      Snapshot snapshot = snapshotsByNames.remove(indexOfOld);
+      final INodeDirectory ssRoot = snapshot.getRoot();
+      ssRoot.setLocalName(newNameBytes);
+      indexOfNew = -indexOfNew - 1;
+      if (indexOfNew <= indexOfOld) {
+        snapshotsByNames.add(indexOfNew, snapshot);
+      } else { // indexOfNew > indexOfOld
+        snapshotsByNames.add(indexOfNew - 1, snapshot);
+      }
+    }
+  }
+
+  public int getSnapshotQuota() {
+    return snapshotQuota;
+  }
+
+  public void setSnapshotQuota(int snapshotQuota) {
+    if (snapshotQuota < 0) {
+      throw new HadoopIllegalArgumentException(
+          "Cannot set snapshot quota to " + snapshotQuota + " < 0");
+    }
+    this.snapshotQuota = snapshotQuota;
+  }
+
+  /**
+   * Simply add a snapshot into the {@link #snapshotsByNames}. Used when loading
+   * fsimage.
+   */
+  void addSnapshot(Snapshot snapshot) {
+    this.snapshotsByNames.add(snapshot);
+  }
+
+  /** Add a snapshot. */
+  public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
+      throws SnapshotException, QuotaExceededException {
+    //check snapshot quota
+    final int n = getNumSnapshots();
+    if (n + 1 > snapshotQuota) {
+      throw new SnapshotException("Failed to add snapshot: there are already "
+          + n + " snapshot(s) and the snapshot quota is "
+          + snapshotQuota);
+    }
+    final Snapshot s = new Snapshot(id, name, snapshotRoot);
+    final byte[] nameBytes = s.getRoot().getLocalNameBytes();
+    final int i = searchSnapshot(nameBytes);
+    if (i >= 0) {
+      throw new SnapshotException("Failed to add snapshot: there is already a "
+          + "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\".");
+    }
+
+    final DirectoryDiff d = getDiffs().addDiff(id, snapshotRoot);
+    d.setSnapshotRoot(s.getRoot());
+    snapshotsByNames.add(-i - 1, s);
+
+    // set modification time
+    final long now = Time.now();
+    snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
+    s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
+    return s;
+  }
+
+  /**
+   * Remove the snapshot with the given name from {@link #snapshotsByNames},
+   * and delete all the corresponding DirectoryDiff.
+   *
+   * @param snapshotRoot The directory where we take snapshots
+   * @param snapshotName The name of the snapshot to be removed
+   * @param collectedBlocks Used to collect information to update blocksMap
+   * @return The removed snapshot. Null if no snapshot with the given name
+   *         exists.
+   */
+  public Snapshot removeSnapshot(INodeDirectory snapshotRoot,
+      String snapshotName, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) throws SnapshotException {
+    final int i = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
+    if (i < 0) {
+      throw new SnapshotException("Cannot delete snapshot " + snapshotName
+          + " from path " + snapshotRoot.getFullPathName()
+          + ": the snapshot does not exist.");
+    } else {
+      final Snapshot snapshot = snapshotsByNames.get(i);
+      int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId());
+      try {
+        Quota.Counts counts = snapshotRoot.cleanSubtree(snapshot.getId(),
+            prior, collectedBlocks, removedINodes, true);
+        INodeDirectory parent = snapshotRoot.getParent();
+        if (parent != null) {
+          // there will not be any WithName node corresponding to the deleted
+          // snapshot, thus only update the quota usage in the current tree
+          parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+              -counts.get(Quota.DISKSPACE), true);
+        }
+      } catch(QuotaExceededException e) {
+        INode.LOG.error("BUG: removeSnapshot increases namespace usage.", e);
+      }
+      // remove from snapshotsByNames after successfully cleaning the subtree
+      snapshotsByNames.remove(i);
+      return snapshot;
+    }
+  }
+
+  public ContentSummaryComputationContext computeContentSummary(
+      final INodeDirectory snapshotRoot,
+      final ContentSummaryComputationContext summary) {
+    snapshotRoot.computeContentSummary(summary);
+    summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
+    summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+    return summary;
+  }
+
+  /**
+   * Compute the difference between two snapshots (or a snapshot and the current
+   * directory) of the directory.
+   *
+   * @param from The name of the start point of the comparison. Null indicating
+   *          the current tree.
+   * @param to The name of the end point. Null indicating the current tree.
+   * @return The difference between the start/end points.
+   * @throws SnapshotException If there is no snapshot matching the starting
+   *           point, or if endSnapshotName is not null but cannot be identified
+   *           as a previous snapshot.
+   */
+  SnapshotDiffInfo computeDiff(final INodeDirectory snapshotRoot,
+      final String from, final String to) throws SnapshotException {
+    Snapshot fromSnapshot = getSnapshotByName(snapshotRoot, from);
+    Snapshot toSnapshot = getSnapshotByName(snapshotRoot, to);
+    // if the start point is equal to the end point, return null
+    if (from.equals(to)) {
+      return null;
+    }
+    SnapshotDiffInfo diffs = new SnapshotDiffInfo(snapshotRoot, fromSnapshot,
+        toSnapshot);
+    computeDiffRecursively(snapshotRoot, snapshotRoot, new ArrayList<byte[]>(),
+        diffs);
+    return diffs;
+  }
+
+  /**
+   * Find the snapshot matching the given name.
+   *
+   * @param snapshotRoot The directory where snapshots were taken.
+   * @param snapshotName The name of the snapshot.
+   * @return The corresponding snapshot. Null if snapshotName is null or empty.
+   * @throws SnapshotException If snapshotName is not null or empty, but there
+   *           is no snapshot matching the name.
+   */
+  private Snapshot getSnapshotByName(INodeDirectory snapshotRoot,
+      String snapshotName) throws SnapshotException {
+    Snapshot s = null;
+    if (snapshotName != null && !snapshotName.isEmpty()) {
+      final int index = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
+      if (index < 0) {
+        throw new SnapshotException("Cannot find the snapshot of directory "
+            + snapshotRoot.getFullPathName() + " with name " + snapshotName);
+      }
+      s = snapshotsByNames.get(index);
+    }
+    return s;
+  }
+
+  /**
+   * Recursively compute the difference between snapshots under a given
+   * directory/file.
+   * @param snapshotRoot The directory where snapshots were taken.
+   * @param node The directory/file under which the diff is computed.
+   * @param parentPath Relative path (corresponding to the snapshot root) of
+   *                   the node's parent.
+   * @param diffReport data structure used to store the diff.
+   */
+  private void computeDiffRecursively(final INodeDirectory snapshotRoot,
+      INode node, List<byte[]> parentPath, SnapshotDiffInfo diffReport) {
+    final Snapshot earlierSnapshot = diffReport.isFromEarlier() ?
+        diffReport.getFrom() : diffReport.getTo();
+    final Snapshot laterSnapshot = diffReport.isFromEarlier() ?
+        diffReport.getTo() : diffReport.getFrom();
+    byte[][] relativePath = parentPath.toArray(new byte[parentPath.size()][]);
+    if (node.isDirectory()) {
+      final ChildrenDiff diff = new ChildrenDiff();
+      INodeDirectory dir = node.asDirectory();
+      DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
+      if (sf != null) {
+        boolean change = sf.computeDiffBetweenSnapshots(earlierSnapshot,
+            laterSnapshot, diff, dir);
+        if (change) {
+          diffReport.addDirDiff(dir, relativePath, diff);
+        }
+      }
+      ReadOnlyList<INode> children = dir.getChildrenList(earlierSnapshot
+          .getId());
+      for (INode child : children) {
+        final byte[] name = child.getLocalNameBytes();
+        boolean toProcess = diff.searchIndex(ListType.DELETED, name) < 0;
+        if (!toProcess && child instanceof INodeReference.WithName) {
+          byte[][] renameTargetPath = findRenameTargetPath(
+              snapshotRoot, (WithName) child,
+              laterSnapshot == null ? Snapshot.CURRENT_STATE_ID :
+                laterSnapshot.getId());
+          if (renameTargetPath != null) {
+            toProcess = true;
+            diffReport.setRenameTarget(child.getId(), renameTargetPath);
+          }
+        }
+        if (toProcess) {
+          parentPath.add(name);
+          computeDiffRecursively(snapshotRoot, child, parentPath, diffReport);
+          parentPath.remove(parentPath.size() - 1);
+        }
+      }
+    } else if (node.isFile() && node.asFile().isWithSnapshot()) {
+      INodeFile file = node.asFile();
+      boolean change = file.getFileWithSnapshotFeature()
+          .changedBetweenSnapshots(file, earlierSnapshot, laterSnapshot);
+      if (change) {
+        diffReport.addFileDiff(file, relativePath);
+      }
+    }
+  }
+
+  /**
+   * We just found a deleted WithName node as the source of a rename operation.
+   * However, we should include it in our snapshot diff report as rename only
+   * if the rename target is also under the same snapshottable directory.
+   */
+  private byte[][] findRenameTargetPath(final INodeDirectory snapshotRoot,
+      INodeReference.WithName wn, final int snapshotId) {
+    INode inode = wn.getReferredINode();
+    final LinkedList<byte[]> ancestors = Lists.newLinkedList();
+    while (inode != null) {
+      if (inode == snapshotRoot) {
+        return ancestors.toArray(new byte[ancestors.size()][]);
+      }
+      if (inode instanceof INodeReference.WithCount) {
+        inode = ((WithCount) inode).getParentRef(snapshotId);
+      } else {
+        INode parent = inode.getParentReference() != null ? inode
+            .getParentReference() : inode.getParent();
+        if (parent != null && parent instanceof INodeDirectory) {
+          int sid = parent.asDirectory().searchChild(inode);
+          if (sid < snapshotId) {
+            return null;
+          }
+        }
+        if (!(parent instanceof WithCount)) {
+          ancestors.addFirst(inode.getLocalNameBytes());
+        }
+        inode = parent;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return "snapshotsByNames=" + snapshotsByNames;
+  }
+
+  @VisibleForTesting
+  public void dumpTreeRecursively(INodeDirectory snapshotRoot, PrintWriter out,
+      StringBuilder prefix, int snapshot) {
+    if (snapshot == Snapshot.CURRENT_STATE_ID) {
+      out.println();
+      out.print(prefix);
+
+      out.print("Snapshot of ");
+      final String name = snapshotRoot.getLocalName();
+      out.print(name.isEmpty()? "/": name);
+      out.print(": quota=");
+      out.print(getSnapshotQuota());
+
+      int n = 0;
+      for(DirectoryDiff diff : getDiffs()) {
+        if (diff.isSnapshotRoot()) {
+          n++;
+        }
+      }
+      Preconditions.checkState(n == snapshotsByNames.size(), "#n=" + n
+          + ", snapshotsByNames.size()=" + snapshotsByNames.size());
+      out.print(", #snapshot=");
+      out.println(n);
+
+      INodeDirectory.dumpTreeRecursively(out, prefix,
+          new Iterable<SnapshotAndINode>() {
+        @Override
+        public Iterator<SnapshotAndINode> iterator() {
+          return new Iterator<SnapshotAndINode>() {
+            final Iterator<DirectoryDiff> i = getDiffs().iterator();
+            private DirectoryDiff next = findNext();
+
+            private DirectoryDiff findNext() {
+              for(; i.hasNext(); ) {
+                final DirectoryDiff diff = i.next();
+                if (diff.isSnapshotRoot()) {
+                  return diff;
+                }
+              }
+              return null;
+            }
+
+            @Override
+            public boolean hasNext() {
+              return next != null;
+            }
+
+            @Override
+            public SnapshotAndINode next() {
+              final SnapshotAndINode pair = new SnapshotAndINode(next
+                  .getSnapshotId(), getSnapshotById(next.getSnapshotId())
+                  .getRoot());
+              next = findNext();
+              return pair;
+            }
+
+            @Override
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      });
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java Tue Jul  8 00:08:18 2014
@@ -48,7 +48,9 @@ import org.apache.hadoop.hdfs.util.ReadO
 import com.google.common.base.Preconditions;
 
 /**
- * Feature for directory with snapshot-related information.
+ * Feature used to store and process the snapshot diff information for a
+ * directory. In particular, it contains a directory diff list recording changes
+ * made to the directory and its children for each snapshot.
  */
 @InterfaceAudience.Private
 public class DirectoryWithSnapshotFeature implements INode.Feature {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java Tue Jul  8 00:08:18 2014
@@ -127,9 +127,8 @@ public class FSImageFormatPBSnapshot {
     }
 
     /**
-     * Load the snapshots section from fsimage. Also convert snapshottable
-     * directories into {@link INodeDirectorySnapshottable}.
-     *
+     * Load the snapshots section from fsimage. Also add snapshottable feature
+     * to snapshottable directories.
      */
     public void loadSnapshotSection(InputStream in) throws IOException {
       SnapshotManager sm = fsn.getSnapshotManager();
@@ -139,16 +138,13 @@ public class FSImageFormatPBSnapshot {
       sm.setSnapshotCounter(section.getSnapshotCounter());
       for (long sdirId : section.getSnapshottableDirList()) {
         INodeDirectory dir = fsDir.getInode(sdirId).asDirectory();
-        final INodeDirectorySnapshottable sdir;
         if (!dir.isSnapshottable()) {
-          sdir = new INodeDirectorySnapshottable(dir);
-          fsDir.addToInodeMap(sdir);
+          dir.addSnapshottableFeature();
         } else {
           // dir is root, and admin set root to snapshottable before
-          sdir = (INodeDirectorySnapshottable) dir;
-          sdir.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+          dir.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
         }
-        sm.addSnapshottable(sdir);
+        sm.addSnapshottable(dir);
       }
       loadSnapshots(in, snum);
     }
@@ -160,12 +156,11 @@ public class FSImageFormatPBSnapshot {
         INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
             parent.getLoaderContext());
         int sid = pbs.getSnapshotId();
-        INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
-            .getInode(root.getId()).asDirectory();
+        INodeDirectory parent = fsDir.getInode(root.getId()).asDirectory();
         Snapshot snapshot = new Snapshot(sid, root, parent);
         // add the snapshot to parent, since we follow the sequence of
         // snapshotsByNames when saving, we do not need to sort when loading
-        parent.addSnapshot(snapshot);
+        parent.getDirectorySnapshottableFeature().addSnapshot(snapshot);
         snapshotMap.put(sid, snapshot);
       }
     }
@@ -373,14 +368,15 @@ public class FSImageFormatPBSnapshot {
           .setSnapshotCounter(sm.getSnapshotCounter())
           .setNumSnapshots(sm.getNumSnapshots());
 
-      INodeDirectorySnapshottable[] snapshottables = sm.getSnapshottableDirs();
-      for (INodeDirectorySnapshottable sdir : snapshottables) {
+      INodeDirectory[] snapshottables = sm.getSnapshottableDirs();
+      for (INodeDirectory sdir : snapshottables) {
         b.addSnapshottableDir(sdir.getId());
       }
       b.build().writeDelimitedTo(out);
       int i = 0;
-      for(INodeDirectorySnapshottable sdir : snapshottables) {
-        for(Snapshot s : sdir.getSnapshotsByNames()) {
+      for(INodeDirectory sdir : snapshottables) {
+        for (Snapshot s : sdir.getDirectorySnapshottableFeature()
+            .getSnapshotList()) {
           Root sroot = s.getRoot();
           SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
               .newBuilder().setSnapshotId(s.getId());

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Tue Jul  8 00:08:18 2014
@@ -184,15 +184,14 @@ public class Snapshot implements Compara
   /** The root directory of the snapshot. */
   private final Root root;
 
-  Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
+  Snapshot(int id, String name, INodeDirectory dir) {
     this(id, dir, dir);
     this.root.setLocalName(DFSUtil.string2Bytes(name));
   }
 
-  Snapshot(int id, INodeDirectory dir, INodeDirectorySnapshottable parent) {
+  Snapshot(int id, INodeDirectory dir, INodeDirectory parent) {
     this.id = id;
     this.root = new Root(dir);
-
     this.root.setParent(parent);
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java Tue Jul  8 00:08:18 2014
@@ -99,7 +99,7 @@ class SnapshotDiffInfo {
   }
 
   /** The root directory of the snapshots */
-  private final INodeDirectorySnapshottable snapshotRoot;
+  private final INodeDirectory snapshotRoot;
   /** The starting point of the difference */
   private final Snapshot from;
   /** The end point of the difference */
@@ -122,8 +122,8 @@ class SnapshotDiffInfo {
   private final Map<Long, RenameEntry> renameMap =
       new HashMap<Long, RenameEntry>();
 
-  SnapshotDiffInfo(INodeDirectorySnapshottable snapshotRoot, Snapshot start,
-      Snapshot end) {
+  SnapshotDiffInfo(INodeDirectory snapshotRoot, Snapshot start, Snapshot end) {
+    Preconditions.checkArgument(snapshotRoot.isSnapshottable());
     this.snapshotRoot = snapshotRoot;
     this.from = start;
     this.to = end;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java Tue Jul  8 00:08:18 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.tools.snap
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A helper class defining static methods for reading/writing snapshot related
  * information from/to FSImage.
@@ -52,17 +54,19 @@ public class SnapshotFSImageFormat {
    * @param out The {@link DataOutput} to write.
    * @throws IOException
    */
-  public static void saveSnapshots(INodeDirectorySnapshottable current,
-      DataOutput out) throws IOException {
+  public static void saveSnapshots(INodeDirectory current, DataOutput out)
+      throws IOException {
+    DirectorySnapshottableFeature sf = current.getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     // list of snapshots in snapshotsByNames
-    ReadOnlyList<Snapshot> snapshots = current.getSnapshotsByNames();
+    ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     out.writeInt(snapshots.size());
     for (Snapshot s : snapshots) {
       // write the snapshot id
       out.writeInt(s.getId());
     }
     // snapshot quota
-    out.writeInt(current.getSnapshotQuota());
+    out.writeInt(sf.getSnapshotQuota());
   }
 
   /**
@@ -216,19 +220,22 @@ public class SnapshotFSImageFormat {
    * @param loader
    *          The loader
    */
-  public static void loadSnapshotList(
-      INodeDirectorySnapshottable snapshottableParent, int numSnapshots,
-      DataInput in, FSImageFormat.Loader loader) throws IOException {
+  public static void loadSnapshotList(INodeDirectory snapshottableParent,
+      int numSnapshots, DataInput in, FSImageFormat.Loader loader)
+      throws IOException {
+    DirectorySnapshottableFeature sf = snapshottableParent
+        .getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     for (int i = 0; i < numSnapshots; i++) {
       // read snapshots
       final Snapshot s = loader.getSnapshot(in);
       s.getRoot().setParent(snapshottableParent);
-      snapshottableParent.addSnapshot(s);
+      sf.addSnapshot(s);
     }
     int snapshotQuota = in.readInt();
     snapshottableParent.setSnapshotQuota(snapshotQuota);
   }
-  
+
   /**
    * Load the {@link SnapshotDiff} list for the INodeDirectoryWithSnapshot
    * directory.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Tue Jul  8 00:08:18 2014
@@ -44,6 +44,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.apache.hadoop.metrics2.util.MBeans;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Manage snapshottable directories and their snapshots.
  * 
@@ -66,8 +68,8 @@ public class SnapshotManager implements 
   private int snapshotCounter = 0;
   
   /** All snapshottable directories in the namesystem. */
-  private final Map<Long, INodeDirectorySnapshottable> snapshottables
-      = new HashMap<Long, INodeDirectorySnapshottable>();
+  private final Map<Long, INodeDirectory> snapshottables =
+      new HashMap<Long, INodeDirectory>();
 
   public SnapshotManager(final FSDirectory fsdir) {
     this.fsdir = fsdir;
@@ -84,7 +86,7 @@ public class SnapshotManager implements 
       return;
     }
 
-    for(INodeDirectorySnapshottable s : snapshottables.values()) {
+    for(INodeDirectory s : snapshottables.values()) {
       if (s.isAncestorDirectory(dir)) {
         throw new SnapshotException(
             "Nested snapshottable directories not allowed: path=" + path
@@ -112,33 +114,30 @@ public class SnapshotManager implements 
       checkNestedSnapshottable(d, path);
     }
 
-
-    final INodeDirectorySnapshottable s;
     if (d.isSnapshottable()) {
       //The directory is already a snapshottable directory.
-      s = (INodeDirectorySnapshottable)d; 
-      s.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+      d.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
     } else {
-      s = d.replaceSelf4INodeDirectorySnapshottable(iip.getLatestSnapshotId(),
-          fsdir.getINodeMap());
+      d.addSnapshottableFeature();
     }
-    addSnapshottable(s);
+    addSnapshottable(d);
   }
   
   /** Add the given snapshottable directory to {@link #snapshottables}. */
-  public void addSnapshottable(INodeDirectorySnapshottable dir) {
+  public void addSnapshottable(INodeDirectory dir) {
+    Preconditions.checkArgument(dir.isSnapshottable());
     snapshottables.put(dir.getId(), dir);
   }
 
   /** Remove the given snapshottable directory from {@link #snapshottables}. */
-  private void removeSnapshottable(INodeDirectorySnapshottable s) {
+  private void removeSnapshottable(INodeDirectory s) {
     snapshottables.remove(s.getId());
   }
   
   /** Remove snapshottable directories from {@link #snapshottables} */
-  public void removeSnapshottable(List<INodeDirectorySnapshottable> toRemove) {
+  public void removeSnapshottable(List<INodeDirectory> toRemove) {
     if (toRemove != null) {
-      for (INodeDirectorySnapshottable s : toRemove) {
+      for (INodeDirectory s : toRemove) {
         removeSnapshottable(s);
       }
     }
@@ -152,22 +151,22 @@ public class SnapshotManager implements 
   public void resetSnapshottable(final String path) throws IOException {
     final INodesInPath iip = fsdir.getINodesInPath4Write(path);
     final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path);
-    if (!d.isSnapshottable()) {
+    DirectorySnapshottableFeature sf = d.getDirectorySnapshottableFeature();
+    if (sf == null) {
       // the directory is already non-snapshottable
       return;
     }
-    final INodeDirectorySnapshottable s = (INodeDirectorySnapshottable) d;
-    if (s.getNumSnapshots() > 0) {
+    if (sf.getNumSnapshots() > 0) {
       throw new SnapshotException("The directory " + path + " has snapshot(s). "
           + "Please redo the operation after removing all the snapshots.");
     }
 
-    if (s == fsdir.getRoot()) {
-      s.setSnapshotQuota(0); 
+    if (d == fsdir.getRoot()) {
+      d.setSnapshotQuota(0);
     } else {
-      s.replaceSelf(iip.getLatestSnapshotId(), fsdir.getINodeMap());
+      d.removeSnapshottableFeature();
     }
-    removeSnapshottable(s);
+    removeSnapshottable(d);
   }
 
   /**
@@ -180,10 +179,15 @@ public class SnapshotManager implements 
   *           Throw IOException when the given path does not lead to an
   *           existing snapshottable directory.
   */
-  public INodeDirectorySnapshottable getSnapshottableRoot(final String path
-      ) throws IOException {
-    final INodesInPath i = fsdir.getINodesInPath4Write(path);
-    return INodeDirectorySnapshottable.valueOf(i.getLastINode(), path);
+  public INodeDirectory getSnapshottableRoot(final String path)
+      throws IOException {
+    final INodeDirectory dir = INodeDirectory.valueOf(fsdir
+        .getINodesInPath4Write(path).getLastINode(), path);
+    if (!dir.isSnapshottable()) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + path);
+    }
+    return dir;
   }
 
   /**
@@ -202,7 +206,7 @@ public class SnapshotManager implements 
    */
   public String createSnapshot(final String path, String snapshotName
       ) throws IOException {
-    INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(path);
 
     if (snapshotCounter == getMaxSnapshotID()) {
       // We have reached the maximum allowable snapshot ID and since we don't
@@ -235,7 +239,7 @@ public class SnapshotManager implements 
     // parse the path, and check if the path is a snapshot path
     // the INodeDirectorySnapshottable#valueOf method will throw Exception 
     // if the path is not for a snapshottable directory
-    INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+    INodeDirectory srcRoot = getSnapshottableRoot(path);
     srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
     numSnapshots.getAndDecrement();
   }
@@ -258,8 +262,7 @@ public class SnapshotManager implements 
       final String newSnapshotName) throws IOException {
     // Find the source root directory path where the snapshot was taken.
     // All the check for path has been included in the valueOf method.
-    final INodeDirectorySnapshottable srcRoot
-        = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
+    final INodeDirectory srcRoot = getSnapshottableRoot(path);
     // Note that renameSnapshot and createSnapshot are synchronized externally
     // through FSNamesystem's write lock
     srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName);
@@ -285,9 +288,9 @@ public class SnapshotManager implements 
     snapshotCounter = counter;
   }
 
-  INodeDirectorySnapshottable[] getSnapshottableDirs() {
+  INodeDirectory[] getSnapshottableDirs() {
     return snapshottables.values().toArray(
-        new INodeDirectorySnapshottable[snapshottables.size()]);
+        new INodeDirectory[snapshottables.size()]);
   }
 
   /**
@@ -299,8 +302,9 @@ public class SnapshotManager implements 
     out.writeInt(numSnapshots.get());
 
     // write all snapshots.
-    for(INodeDirectorySnapshottable snapshottableDir : snapshottables.values()) {
-      for(Snapshot s : snapshottableDir.getSnapshotsByNames()) {
+    for(INodeDirectory snapshottableDir : snapshottables.values()) {
+      for (Snapshot s : snapshottableDir.getDirectorySnapshottableFeature()
+          .getSnapshotList()) {
         s.write(out);
       }
     }
@@ -339,16 +343,16 @@ public class SnapshotManager implements 
     
     List<SnapshottableDirectoryStatus> statusList = 
         new ArrayList<SnapshottableDirectoryStatus>();
-    for (INodeDirectorySnapshottable dir : snapshottables.values()) {
+    for (INodeDirectory dir : snapshottables.values()) {
       if (userName == null || userName.equals(dir.getUserName())) {
         SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus(
             dir.getModificationTime(), dir.getAccessTime(),
             dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
             dir.getLocalNameBytes(), dir.getId(), 
             dir.getChildrenNum(Snapshot.CURRENT_STATE_ID),
-            dir.getNumSnapshots(),
-            dir.getSnapshotQuota(), dir.getParent() == null ? 
-                DFSUtil.EMPTY_BYTES : 
+            dir.getDirectorySnapshottableFeature().getNumSnapshots(),
+            dir.getDirectorySnapshottableFeature().getSnapshotQuota(),
+            dir.getParent() == null ? DFSUtil.EMPTY_BYTES :
                 DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
         statusList.add(status);
       }
@@ -364,20 +368,18 @@ public class SnapshotManager implements 
    */
   public SnapshotDiffReport diff(final String path, final String from,
       final String to) throws IOException {
+    // Find the source root directory path where the snapshots were taken.
+    // All the check for path has been included in the valueOf method.
+    final INodeDirectory snapshotRoot = getSnapshottableRoot(path);
+
     if ((from == null || from.isEmpty())
         && (to == null || to.isEmpty())) {
       // both fromSnapshot and toSnapshot indicate the current tree
       return new SnapshotDiffReport(path, from, to,
           Collections.<DiffReportEntry> emptyList());
     }
-
-    // Find the source root directory path where the snapshots were taken.
-    // All the check for path has been included in the valueOf method.
-    INodesInPath inodesInPath = fsdir.getINodesInPath4Write(path.toString());
-    final INodeDirectorySnapshottable snapshotRoot = INodeDirectorySnapshottable
-        .valueOf(inodesInPath.getLastINode(), path);
-
-    final SnapshotDiffInfo diffs = snapshotRoot.computeDiff(from, to);
+    final SnapshotDiffInfo diffs = snapshotRoot
+        .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to);
     return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
         path, from, to, Collections.<DiffReportEntry> emptyList());
   }
@@ -412,7 +414,7 @@ public class SnapshotManager implements 
     getSnapshottableDirectories() {
     List<SnapshottableDirectoryStatus.Bean> beans =
         new ArrayList<SnapshottableDirectoryStatus.Bean>();
-    for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
+    for (INodeDirectory d : getSnapshottableDirs()) {
       beans.add(toBean(d));
     }
     return beans.toArray(new SnapshottableDirectoryStatus.Bean[beans.size()]);
@@ -421,20 +423,19 @@ public class SnapshotManager implements 
   @Override // SnapshotStatsMXBean
   public SnapshotInfo.Bean[] getSnapshots() {
     List<SnapshotInfo.Bean> beans = new ArrayList<SnapshotInfo.Bean>();
-    for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
-      for (Snapshot s : d.getSnapshotList()) {
+    for (INodeDirectory d : getSnapshottableDirs()) {
+      for (Snapshot s : d.getDirectorySnapshottableFeature().getSnapshotList()) {
         beans.add(toBean(s));
       }
     }
     return beans.toArray(new SnapshotInfo.Bean[beans.size()]);
   }
 
-  public static SnapshottableDirectoryStatus.Bean toBean(
-      INodeDirectorySnapshottable d) {
+  public static SnapshottableDirectoryStatus.Bean toBean(INodeDirectory d) {
     return new SnapshottableDirectoryStatus.Bean(
         d.getFullPathName(),
-        d.getNumSnapshots(),
-        d.getSnapshotQuota(),
+        d.getDirectorySnapshottableFeature().getNumSnapshots(),
+        d.getDirectorySnapshottableFeature().getSnapshotQuota(),
         d.getModificationTime(),
         Short.valueOf(Integer.toOctalString(
             d.getFsPermissionShort())),

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java Tue Jul  8 00:08:18 2014
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.util.Canceler;
@@ -194,8 +193,8 @@ public class TestFSImageWithSnapshot {
     fsn = cluster.getNamesystem();
     hdfs = cluster.getFileSystem();
     
-    INodeDirectorySnapshottable rootNode = 
-        (INodeDirectorySnapshottable) fsn.dir.getINode4Write(root.toString());
+    INodeDirectory rootNode = fsn.dir.getINode4Write(root.toString())
+        .asDirectory();
     assertTrue("The children list of root should be empty", 
         rootNode.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty());
     // one snapshot on root: s1

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java Tue Jul  8 00:08:18 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -90,22 +89,20 @@ public class TestSnapshotPathINodes {
     final INode before = fsdir.getINode(pathStr);
     
     // Before a directory is snapshottable
-    Assert.assertTrue(before instanceof INodeDirectory);
-    Assert.assertFalse(before instanceof INodeDirectorySnapshottable);
+    Assert.assertFalse(before.asDirectory().isSnapshottable());
 
     // After a directory is snapshottable
     final Path path = new Path(pathStr);
     hdfs.allowSnapshot(path);
     {
       final INode after = fsdir.getINode(pathStr);
-      Assert.assertTrue(after instanceof INodeDirectorySnapshottable);
+      Assert.assertTrue(after.asDirectory().isSnapshottable());
     }
     
     hdfs.disallowSnapshot(path);
     {
       final INode after = fsdir.getINode(pathStr);
-      Assert.assertTrue(after instanceof INodeDirectory);
-      Assert.assertFalse(after instanceof INodeDirectorySnapshottable);
+      Assert.assertFalse(after.asDirectory().isSnapshottable());
     }
   }
   
@@ -115,8 +112,7 @@ public class TestSnapshotPathINodes {
     }
     final int i = inodesInPath.getSnapshotRootIndex() - 1;
     final INode inode = inodesInPath.getINodes()[i];
-    return ((INodeDirectorySnapshottable)inode).getSnapshot(
-        DFSUtil.string2Bytes(name)); 
+    return inode.asDirectory().getSnapshot(DFSUtil.string2Bytes(name));
   }
 
   static void assertSnapshot(INodesInPath inodesInPath, boolean isSnapshot,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java Tue Jul  8 00:08:18 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.log4j.Level;
@@ -153,8 +154,7 @@ public class TestINodeFileUnderConstruct
     // deleted list, with size BLOCKSIZE*2
     INodeFile fileNode = (INodeFile) fsdir.getINode(file.toString());
     assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize());
-    INodeDirectorySnapshottable dirNode = (INodeDirectorySnapshottable) fsdir
-        .getINode(dir.toString());
+    INodeDirectory dirNode = fsdir.getINode(dir.toString()).asDirectory();
     DirectoryDiff last = dirNode.getDiffs().getLast();
     
     // 2. append without closing stream
@@ -162,7 +162,7 @@ public class TestINodeFileUnderConstruct
     out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
     
     // re-check nodeInDeleted_S0
-    dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    dirNode = fsdir.getINode(dir.toString()).asDirectory();
     assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize(last.getSnapshotId()));
     
     // 3. take snapshot --> close stream
@@ -172,7 +172,7 @@ public class TestINodeFileUnderConstruct
     // check: an INodeFileUnderConstructionWithSnapshot with size BLOCKSIZE*3 should
     // have been stored in s1's deleted list
     fileNode = (INodeFile) fsdir.getINode(file.toString());
-    dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    dirNode = fsdir.getINode(dir.toString()).asDirectory();
     last = dirNode.getDiffs().getLast();
     assertTrue(fileNode.isWithSnapshot());
     assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(last.getSnapshotId()));

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java?rev=1608631&r1=1608630&r2=1608631&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java Tue Jul  8 00:08:18 2014
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import static org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SNAPSHOT_LIMIT;
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature.SNAPSHOT_LIMIT;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -312,10 +312,9 @@ public class TestNestedSnapshots {
   public void testIdCmp() {
     final PermissionStatus perm = PermissionStatus.createImmutable(
         "user", "group", FsPermission.createImmutable((short)0));
-    final INodeDirectory dir = new INodeDirectory(0,
+    final INodeDirectory snapshottable = new INodeDirectory(0,
         DFSUtil.string2Bytes("foo"), perm, 0L);
-    final INodeDirectorySnapshottable snapshottable
-        = new INodeDirectorySnapshottable(dir);
+    snapshottable.addSnapshottableFeature();
     final Snapshot[] snapshots = {
       new Snapshot(1, "s1", snapshottable),
       new Snapshot(1, "s1", snapshottable),
@@ -362,7 +361,7 @@ public class TestNestedSnapshots {
     
     hdfs.allowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());
-    assertTrue(subNode instanceof INodeDirectorySnapshottable);
+    assertTrue(subNode.isDirectory() && subNode.asDirectory().isSnapshottable());
     
     hdfs.disallowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());



Mime
View raw message