hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject [2/2] git commit: HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable directory. Contributed by Jing Zhao.
Date Thu, 04 Sep 2014 06:15:24 GMT
HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable directory. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1608631 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b68818c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b68818c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b68818c4

Branch: refs/heads/branch-2
Commit: b68818c4f085f6657f96c10e975c2b1f27362108
Parents: 7b287d5
Author: Haohui Mai <wheat9@apache.org>
Authored: Tue Jul 8 00:08:18 2014 +0000
Committer: Jing Zhao <jing@hortonworks.com>
Committed: Wed Sep 3 23:11:31 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirectory.java       |  53 +-
 .../hdfs/server/namenode/FSImageFormat.java     |  26 +-
 .../server/namenode/FSImageSerialization.java   |   3 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   3 +-
 .../hdfs/server/namenode/INodeDirectory.java    | 108 ++--
 .../namenode/INodeWithAdditionalFields.java     |   3 +-
 .../hdfs/server/namenode/INodesInPath.java      |   7 +-
 .../snapshot/DirectorySnapshottableFeature.java | 455 +++++++++++++++++
 .../snapshot/DirectoryWithSnapshotFeature.java  |   4 +-
 .../snapshot/FSImageFormatPBSnapshot.java       |  28 +-
 .../snapshot/INodeDirectorySnapshottable.java   | 498 -------------------
 .../hdfs/server/namenode/snapshot/Snapshot.java |   5 +-
 .../namenode/snapshot/SnapshotDiffInfo.java     |   6 +-
 .../snapshot/SnapshotFSImageFormat.java         |  25 +-
 .../namenode/snapshot/SnapshotManager.java      | 105 ++--
 .../namenode/TestFSImageWithSnapshot.java       |   5 +-
 .../server/namenode/TestSnapshotPathINodes.java |  12 +-
 ...tINodeFileUnderConstructionWithSnapshot.java |   8 +-
 .../namenode/snapshot/TestNestedSnapshots.java  |   9 +-
 .../snapshot/TestRenameWithSnapshots.java       |  89 ++--
 .../snapshot/TestSetQuotaWithSnapshot.java      |  31 +-
 .../server/namenode/snapshot/TestSnapshot.java  |  15 +-
 .../namenode/snapshot/TestSnapshotDeletion.java |  11 +-
 .../namenode/snapshot/TestSnapshotManager.java  |  12 +-
 .../namenode/snapshot/TestSnapshotRename.java   |  16 +-
 26 files changed, 752 insertions(+), 788 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 125c47a..25b6dc3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -179,6 +179,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6959. Make the HDFS home directory location customizable. (yzhang via
     cmccabe)
 
+    HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
+    directory. (Jing Zhao via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index ee9ed11..ba154ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -80,7 +80,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ByteArray;
@@ -100,7 +100,7 @@ import com.google.common.collect.Lists;
  **/
 @InterfaceAudience.Private
 public class FSDirectory implements Closeable {
-  private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
+  private static INodeDirectory createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
         INodeDirectory.ROOT_NAME,
@@ -109,9 +109,9 @@ public class FSDirectory implements Closeable {
     r.addDirectoryWithQuotaFeature(
         DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
         DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
-    s.setSnapshotQuota(0);
-    return s;
+    r.addSnapshottableFeature();
+    r.setSnapshotQuota(0);
+    return r;
   }
 
   @VisibleForTesting
@@ -633,8 +633,7 @@ public class FSDirectory implements Closeable {
 
     ezManager.checkMoveValidity(srcIIP, dstIIP, src);
     final INode dstInode = dstIIP.getLastINode();
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
+    List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
     if (dstInode != null) { // Destination exists
       validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
       checkSnapshot(dstInode, snapshottableDirs);
@@ -1158,8 +1157,7 @@ public class FSDirectory implements Closeable {
       if (!deleteAllowed(inodesInPath, src) ) {
         filesRemoved = -1;
       } else {
-        List<INodeDirectorySnapshottable> snapshottableDirs = 
-            new ArrayList<INodeDirectorySnapshottable>();
+        List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
         checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
             removedINodes, mtime);
@@ -1229,8 +1227,7 @@ public class FSDirectory implements Closeable {
         normalizePath(src), false);
     long filesRemoved = -1;
     if (deleteAllowed(inodesInPath, src)) {
-      List<INodeDirectorySnapshottable> snapshottableDirs = 
-          new ArrayList<INodeDirectorySnapshottable>();
+      List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
       checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
       filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
           removedINodes, mtime);
@@ -1305,19 +1302,20 @@ public class FSDirectory implements Closeable {
    *                          but do not have snapshots yet
    */
   private static void checkSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) throws SnapshotException {
+      List<INodeDirectory> snapshottableDirs) throws SnapshotException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
-      if (targetDir.isSnapshottable()) {
-        INodeDirectorySnapshottable ssTargetDir = 
-            (INodeDirectorySnapshottable) targetDir;
-        if (ssTargetDir.getNumSnapshots() > 0) {
-          throw new SnapshotException("The directory " + ssTargetDir.getFullPathName()
-              + " cannot be deleted since " + ssTargetDir.getFullPathName()
+      DirectorySnapshottableFeature sf = targetDir
+          .getDirectorySnapshottableFeature();
+      if (sf != null) {
+        if (sf.getNumSnapshots() > 0) {
+          String fullPath = targetDir.getFullPathName();
+          throw new SnapshotException("The directory " + fullPath
+              + " cannot be deleted since " + fullPath
               + " is snapshottable and already has snapshots");
         } else {
           if (snapshottableDirs != null) {
-            snapshottableDirs.add(ssTargetDir);
+            snapshottableDirs.add(targetDir);
           }
         }
       } 
@@ -1405,14 +1403,18 @@ public class FSDirectory implements Closeable {
     Preconditions.checkArgument(
         src.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR),
         "%s does not end with %s", src, HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR);
-    
+
     final String dirPath = normalizePath(src.substring(0,
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    final INodeDirectorySnapshottable dirNode = INodeDirectorySnapshottable
-        .valueOf(node, dirPath);
-    final ReadOnlyList<Snapshot> snapshots = dirNode.getSnapshotList();
+    final INodeDirectory dirNode = INodeDirectory.valueOf(node, dirPath);
+    final DirectorySnapshottableFeature sf = dirNode.getDirectorySnapshottableFeature();
+    if (sf == null) {
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + dirPath);
+    }
+    final ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     int skipSize = ReadOnlyList.Util.binarySearch(snapshots, startAfter);
     skipSize = skipSize < 0 ? -skipSize - 1 : skipSize + 1;
     int numOfListing = Math.min(snapshots.size() - skipSize, this.lsLimit);
@@ -1476,9 +1478,8 @@ public class FSDirectory implements Closeable {
         src.length() - HdfsConstants.DOT_SNAPSHOT_DIR.length()));
     
     final INode node = this.getINode(dirPath);
-    if (node != null
-        && node.isDirectory()
-        && node.asDirectory() instanceof INodeDirectorySnapshottable) {
+    if (node != null && node.isDirectory()
+        && node.asDirectory().isSnapshottable()) {
       return node;
     }
     return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index 839bcba..2677332 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
@@ -74,8 +73,8 @@ import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Contains inner classes for reading or writing the on-disk format for
@@ -555,21 +554,17 @@ public class FSImageFormat {
       if (!toLoadSubtree) {
         return;
       }
-      
+
       // Step 2. Load snapshots if parent is snapshottable
       int numSnapshots = in.readInt();
       if (numSnapshots >= 0) {
-        final INodeDirectorySnapshottable snapshottableParent
-            = INodeDirectorySnapshottable.valueOf(parent, parent.getLocalName());
         // load snapshots and snapshotQuota
-        SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
-            numSnapshots, in, this);
-        if (snapshottableParent.getSnapshotQuota() > 0) {
+        SnapshotFSImageFormat.loadSnapshotList(parent, numSnapshots, in, this);
+        if (parent.getDirectorySnapshottableFeature().getSnapshotQuota() > 0) {
           // add the directory to the snapshottable directory list in 
           // SnapshotManager. Note that we only add root when its snapshot quota
           // is positive.
-          this.namesystem.getSnapshotManager().addSnapshottable(
-              snapshottableParent);
+          this.namesystem.getSnapshotManager().addSnapshottable(parent);
         }
       }
 
@@ -832,7 +827,10 @@ public class FSImageFormat {
       if (withSnapshot) {
         dir.addSnapshotFeature(null);
       }
-      return snapshottable ? new INodeDirectorySnapshottable(dir) : dir;
+      if (snapshottable) {
+        dir.addSnapshottableFeature();
+      }
+      return dir;
     } else if (numBlocks == -2) {
       //symlink
       if (!FileSystem.areSymlinksEnabled()) {
@@ -1383,10 +1381,8 @@ public class FSImageFormat {
 
       // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
       // Snapshots
-      if (current instanceof INodeDirectorySnapshottable) {
-        INodeDirectorySnapshottable snapshottableNode =
-            (INodeDirectorySnapshottable) current;
-        SnapshotFSImageFormat.saveSnapshots(snapshottableNode, out);
+      if (current.isDirectory() && current.asDirectory().isSnapshottable()) {
+        SnapshotFSImageFormat.saveSnapshots(current.asDirectory(), out);
       } else {
         out.writeInt(-1); // # of snapshots
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 314d55f..eb8354d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.hdfs.util.XMLUtils;
@@ -241,7 +240,7 @@ public class FSImageSerialization {
 
     writeQuota(node.getQuotaCounts(), out);
 
-    if (node instanceof INodeDirectorySnapshottable) {
+    if (node.isSnapshottable()) {
       out.writeBoolean(true);
     } else {
       out.writeBoolean(false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b2a2eda..8dc4241 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -239,7 +239,6 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -7966,7 +7965,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Remove a list of INodeDirectorySnapshottable from the SnapshotManager
    * @param toRemove the list of INodeDirectorySnapshottable to be removed
    */
-  void removeSnapshottableDirs(List<INodeDirectorySnapshottable> toRemove) {
+  void removeSnapshottableDirs(List<INodeDirectory> toRemove) {
     if (snapshotManager != null) {
       snapshotManager.removeSnapshottable(toRemove);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index e7d6a3d..18b4109 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -29,10 +29,11 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -102,11 +103,6 @@ public class INodeDirectory extends INodeWithAdditionalFields
     return this;
   }
 
-  /** Is this a snapshottable directory? */
-  public boolean isSnapshottable() {
-    return false;
-  }
-
   void setQuota(long nsQuota, long dsQuota) {
     DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
     if (quota != null) {
@@ -186,7 +182,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
   public final boolean isWithSnapshot() {
     return getDirectoryWithSnapshotFeature() != null;
   }
-  
+
   public DirectoryDiffList getDiffs() {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     return sf != null ? sf.getDiffs() : null;
@@ -204,50 +200,71 @@ public class INodeDirectory extends INodeWithAdditionalFields
     return super.toDetailString() + (sf == null ? "" : ", " + sf.getDiffs()); 
   }
 
-  /** Replace itself with an {@link INodeDirectorySnapshottable}. */
-  public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
-      int latestSnapshotId, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    Preconditions.checkState(!(this instanceof INodeDirectorySnapshottable),
-        "this is already an INodeDirectorySnapshottable, this=%s", this);
-    final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
-    replaceSelf(s, inodeMap).getDirectoryWithSnapshotFeature().getDiffs()
-        .saveSelf2Snapshot(latestSnapshotId, s, this);
-    return s;
+  public DirectorySnapshottableFeature getDirectorySnapshottableFeature() {
+    return getFeature(DirectorySnapshottableFeature.class);
+  }
+
+  public boolean isSnapshottable() {
+    return getDirectorySnapshottableFeature() != null;
   }
 
-  /** Replace itself with {@link INodeDirectory}. */
-  public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
-    Preconditions.checkState(getClass() != INodeDirectory.class,
-        "the class is already INodeDirectory, this=%s", this);
-    return replaceSelf(new INodeDirectory(this, true, this.getFeatures()),
-      inodeMap);
+  public Snapshot getSnapshot(byte[] snapshotName) {
+    return getDirectorySnapshottableFeature().getSnapshot(snapshotName);
   }
 
-  /** Replace itself with the given directory. */
-  private final <N extends INodeDirectory> N replaceSelf(final N newDir,
-      final INodeMap inodeMap) {
-    final INodeReference ref = getParentReference();
-    if (ref != null) {
-      ref.setReferredINode(newDir);
-      if (inodeMap != null) {
-        inodeMap.put(newDir);
-      }
-    } else {
-      final INodeDirectory parent = getParent();
-      Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
-      parent.replaceChild(this, newDir, inodeMap);
+  public void setSnapshotQuota(int snapshotQuota) {
+    getDirectorySnapshottableFeature().setSnapshotQuota(snapshotQuota);
+  }
+
+  public Snapshot addSnapshot(int id, String name) throws SnapshotException,
+      QuotaExceededException {
+    return getDirectorySnapshottableFeature().addSnapshot(this, id, name);
+  }
+
+  public Snapshot removeSnapshot(String snapshotName,
+      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
+      throws SnapshotException {
+    return getDirectorySnapshottableFeature().removeSnapshot(this,
+        snapshotName, collectedBlocks, removedINodes);
+  }
+
+  public void renameSnapshot(String path, String oldName, String newName)
+      throws SnapshotException {
+    getDirectorySnapshottableFeature().renameSnapshot(path, oldName, newName);
+  }
+
+  /** add DirectorySnapshottableFeature */
+  public void addSnapshottableFeature() {
+    Preconditions.checkState(!isSnapshottable(),
+        "this is already snapshottable, this=%s", this);
+    DirectoryWithSnapshotFeature s = this.getDirectoryWithSnapshotFeature();
+    final DirectorySnapshottableFeature snapshottable =
+        new DirectorySnapshottableFeature(s);
+    if (s != null) {
+      this.removeFeature(s);
     }
-    clear();
-    return newDir;
+    this.addFeature(snapshottable);
   }
-  
+
+  /** remove DirectorySnapshottableFeature */
+  public void removeSnapshottableFeature() {
+    DirectorySnapshottableFeature s = getDirectorySnapshottableFeature();
+    Preconditions.checkState(s != null,
+        "The dir does not have snapshottable feature: this=%s", this);
+    this.removeFeature(s);
+    if (s.getDiffs().asList().size() > 0) {
+      // add a DirectoryWithSnapshotFeature back
+      DirectoryWithSnapshotFeature sf = new DirectoryWithSnapshotFeature(
+          s.getDiffs());
+      addFeature(sf);
+    }
+  }
+
   /** 
    * Replace the given child with a new child. Note that we no longer need to
    * replace an normal INodeDirectory or INodeFile into an
    * INodeDirectoryWithSnapshot or INodeFileUnderConstruction. The only cases
-   * for child replacement is for {@link INodeDirectorySnapshottable} and 
-   * reference nodes.
+   * for child replacement is for reference nodes.
    */
   public void replaceChild(INode oldChild, final INode newChild,
       final INodeMap inodeMap) {
@@ -821,6 +838,11 @@ public class INodeDirectory extends INodeWithAdditionalFields
         };
       }
     });
+
+    final DirectorySnapshottableFeature s = getDirectorySnapshottableFeature();
+    if (s != null) {
+      s.dumpTreeRecursively(this, out, prefix, snapshot);
+    }
   }
 
   /**
@@ -829,7 +851,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
    * @param subs The subtrees.
    */
   @VisibleForTesting
-  protected static void dumpTreeRecursively(PrintWriter out,
+  public static void dumpTreeRecursively(PrintWriter out,
       StringBuilder prefix, Iterable<SnapshotAndINode> subs) {
     if (subs != null) {
       for(final Iterator<SnapshotAndINode> i = subs.iterator(); i.hasNext();) {
@@ -842,7 +864,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
   }
 
   /** A pair of Snapshot and INode objects. */
-  protected static class SnapshotAndINode {
+  public static class SnapshotAndINode {
     public final int snapshotId;
     public final INode inode;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
index 47f86e5..93da052 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
@@ -315,8 +315,9 @@ public abstract class INodeWithAdditionalFields extends INode
   }
 
   protected <T extends Feature> T getFeature(Class<? extends Feature> clazz) {
+    Preconditions.checkArgument(clazz != null);
     for (Feature f : features) {
-      if (f.getClass() == clazz) {
+      if (clazz.isAssignableFrom(f.getClass())) {
         @SuppressWarnings("unchecked")
         T ret = (T) f;
         return ret;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
index a3507d3..34a3268 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodesInPath.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
@@ -208,8 +207,7 @@ public class INodesInPath {
       final byte[] childName = components[count + 1];
       
       // check if the next byte[] in components is for ".snapshot"
-      if (isDotSnapshotDir(childName)
-          && isDir && dir instanceof INodeDirectorySnapshottable) {
+      if (isDotSnapshotDir(childName) && isDir && dir.isSnapshottable()) {
         // skip the ".snapshot" in components
         count++;
         index++;
@@ -222,8 +220,7 @@ public class INodesInPath {
           break;
         }
         // Resolve snapshot root
-        final Snapshot s = ((INodeDirectorySnapshottable)dir).getSnapshot(
-            components[count + 1]);
+        final Snapshot s = dir.getSnapshot(components[count + 1]);
         if (s == null) {
           //snapshot not found
           curNode = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
new file mode 100644
index 0000000..ae70b88
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.SnapshotAndINode;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
+import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A directory with this feature is a snapshottable directory, where snapshots
+ * can be taken. This feature extends {@link DirectoryWithSnapshotFeature}, and
+ * maintains extra information about all the snapshots taken on this directory.
+ */
+@InterfaceAudience.Private
+public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature {
+  /** Limit the number of snapshot per snapshottable directory. */
+  static final int SNAPSHOT_LIMIT = 1 << 16;
+
+  /**
+   * Snapshots of this directory in ascending order of snapshot names.
+   * Note that snapshots in ascending order of snapshot id are stored in
+   * {@link INodeDirectoryWithSnapshot}.diffs (a private field).
+   */
+  private final List<Snapshot> snapshotsByNames = new ArrayList<Snapshot>();
+  /** Number of snapshots allowed. */
+  private int snapshotQuota = SNAPSHOT_LIMIT;
+
+  public DirectorySnapshottableFeature(DirectoryWithSnapshotFeature feature) {
+    super(feature == null ? null : feature.getDiffs());
+  }
+
+  /** @return the number of existing snapshots. */
+  public int getNumSnapshots() {
+    return snapshotsByNames.size();
+  }
+
+  private int searchSnapshot(byte[] snapshotName) {
+    return Collections.binarySearch(snapshotsByNames, snapshotName);
+  }
+
+  /** @return the snapshot with the given name. */
+  public Snapshot getSnapshot(byte[] snapshotName) {
+    final int i = searchSnapshot(snapshotName);
+    return i < 0? null: snapshotsByNames.get(i);
+  }
+
+  public Snapshot getSnapshotById(int sid) {
+    for (Snapshot s : snapshotsByNames) {
+      if (s.getId() == sid) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  /** @return {@link #snapshotsByNames} as a {@link ReadOnlyList} */
+  public ReadOnlyList<Snapshot> getSnapshotList() {
+    return ReadOnlyList.Util.asReadOnlyList(snapshotsByNames);
+  }
+
+  /**
+   * Rename a snapshot
+   * @param path
+   *          The directory path where the snapshot was taken. Used for
+   *          generating exception message.
+   * @param oldName
+   *          Old name of the snapshot
+   * @param newName
+   *          New name the snapshot will be renamed to
+   * @throws SnapshotException
+   *           Throw SnapshotException when either the snapshot with the old
+   *           name does not exist or a snapshot with the new name already
+   *           exists
+   */
+  public void renameSnapshot(String path, String oldName, String newName)
+      throws SnapshotException {
+    if (newName.equals(oldName)) {
+      return;
+    }
+    final int indexOfOld = searchSnapshot(DFSUtil.string2Bytes(oldName));
+    if (indexOfOld < 0) {
+      throw new SnapshotException("The snapshot " + oldName
+          + " does not exist for directory " + path);
+    } else {
+      final byte[] newNameBytes = DFSUtil.string2Bytes(newName);
+      int indexOfNew = searchSnapshot(newNameBytes);
+      if (indexOfNew >= 0) {
+        throw new SnapshotException("The snapshot " + newName
+            + " already exists for directory " + path);
+      }
+      // remove the one with old name from snapshotsByNames
+      Snapshot snapshot = snapshotsByNames.remove(indexOfOld);
+      final INodeDirectory ssRoot = snapshot.getRoot();
+      ssRoot.setLocalName(newNameBytes);
+      indexOfNew = -indexOfNew - 1;
+      if (indexOfNew <= indexOfOld) {
+        snapshotsByNames.add(indexOfNew, snapshot);
+      } else { // indexOfNew > indexOfOld
+        snapshotsByNames.add(indexOfNew - 1, snapshot);
+      }
+    }
+  }
+
+  public int getSnapshotQuota() {
+    return snapshotQuota;
+  }
+
+  public void setSnapshotQuota(int snapshotQuota) {
+    if (snapshotQuota < 0) {
+      throw new HadoopIllegalArgumentException(
+          "Cannot set snapshot quota to " + snapshotQuota + " < 0");
+    }
+    this.snapshotQuota = snapshotQuota;
+  }
+
+  /**
+   * Simply add a snapshot into the {@link #snapshotsByNames}. Used when loading
+   * fsimage.
+   */
+  void addSnapshot(Snapshot snapshot) {
+    this.snapshotsByNames.add(snapshot);
+  }
+
+  /** Add a snapshot. */
+  public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
+      throws SnapshotException, QuotaExceededException {
+    //check snapshot quota
+    final int n = getNumSnapshots();
+    if (n + 1 > snapshotQuota) {
+      throw new SnapshotException("Failed to add snapshot: there are already "
+          + n + " snapshot(s) and the snapshot quota is "
+          + snapshotQuota);
+    }
+    final Snapshot s = new Snapshot(id, name, snapshotRoot);
+    final byte[] nameBytes = s.getRoot().getLocalNameBytes();
+    final int i = searchSnapshot(nameBytes);
+    if (i >= 0) {
+      throw new SnapshotException("Failed to add snapshot: there is already a "
+          + "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\".");
+    }
+
+    final DirectoryDiff d = getDiffs().addDiff(id, snapshotRoot);
+    d.setSnapshotRoot(s.getRoot());
+    snapshotsByNames.add(-i - 1, s);
+
+    // set modification time
+    final long now = Time.now();
+    snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
+    s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
+    return s;
+  }
+
+  /**
+   * Remove the snapshot with the given name from {@link #snapshotsByNames},
+   * and delete all the corresponding DirectoryDiff.
+   *
+   * @param snapshotRoot The directory where we take snapshots
+   * @param snapshotName The name of the snapshot to be removed
+   * @param collectedBlocks Used to collect information to update blocksMap
+   * @return The removed snapshot. Null if no snapshot with the given name
+   *         exists.
+   */
+  public Snapshot removeSnapshot(INodeDirectory snapshotRoot,
+      String snapshotName, BlocksMapUpdateInfo collectedBlocks,
+      final List<INode> removedINodes) throws SnapshotException {
+    final int i = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
+    if (i < 0) {
+      throw new SnapshotException("Cannot delete snapshot " + snapshotName
+          + " from path " + snapshotRoot.getFullPathName()
+          + ": the snapshot does not exist.");
+    } else {
+      final Snapshot snapshot = snapshotsByNames.get(i);
+      int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId());
+      try {
+        Quota.Counts counts = snapshotRoot.cleanSubtree(snapshot.getId(),
+            prior, collectedBlocks, removedINodes, true);
+        INodeDirectory parent = snapshotRoot.getParent();
+        if (parent != null) {
+          // there will not be any WithName node corresponding to the deleted
+          // snapshot, thus only update the quota usage in the current tree
+          parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
+              -counts.get(Quota.DISKSPACE), true);
+        }
+      } catch(QuotaExceededException e) {
+        INode.LOG.error("BUG: removeSnapshot increases namespace usage.", e);
+      }
+      // remove from snapshotsByNames after successfully cleaning the subtree
+      snapshotsByNames.remove(i);
+      return snapshot;
+    }
+  }
+
+  public ContentSummaryComputationContext computeContentSummary(
+      final INodeDirectory snapshotRoot,
+      final ContentSummaryComputationContext summary) {
+    snapshotRoot.computeContentSummary(summary);
+    summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
+    summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+    return summary;
+  }
+
+  /**
+   * Compute the difference between two snapshots (or a snapshot and the current
+   * directory) of the directory.
+   *
+   * @param from The name of the start point of the comparison. Null indicating
+   *          the current tree.
+   * @param to The name of the end point. Null indicating the current tree.
+   * @return The difference between the start/end points.
+   * @throws SnapshotException If there is no snapshot matching the starting
+   *           point, or if endSnapshotName is not null but cannot be identified
+   *           as a previous snapshot.
+   */
+  SnapshotDiffInfo computeDiff(final INodeDirectory snapshotRoot,
+      final String from, final String to) throws SnapshotException {
+    Snapshot fromSnapshot = getSnapshotByName(snapshotRoot, from);
+    Snapshot toSnapshot = getSnapshotByName(snapshotRoot, to);
+    // if the start point is equal to the end point, return null
+    if (from.equals(to)) {
+      return null;
+    }
+    SnapshotDiffInfo diffs = new SnapshotDiffInfo(snapshotRoot, fromSnapshot,
+        toSnapshot);
+    computeDiffRecursively(snapshotRoot, snapshotRoot, new ArrayList<byte[]>(),
+        diffs);
+    return diffs;
+  }
+
+  /**
+   * Find the snapshot matching the given name.
+   *
+   * @param snapshotRoot The directory where snapshots were taken.
+   * @param snapshotName The name of the snapshot.
+   * @return The corresponding snapshot. Null if snapshotName is null or empty.
+   * @throws SnapshotException If snapshotName is not null or empty, but there
+   *           is no snapshot matching the name.
+   */
+  private Snapshot getSnapshotByName(INodeDirectory snapshotRoot,
+      String snapshotName) throws SnapshotException {
+    Snapshot s = null;
+    if (snapshotName != null && !snapshotName.isEmpty()) {
+      final int index = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
+      if (index < 0) {
+        throw new SnapshotException("Cannot find the snapshot of directory "
+            + snapshotRoot.getFullPathName() + " with name " + snapshotName);
+      }
+      s = snapshotsByNames.get(index);
+    }
+    return s;
+  }
+
+  /**
+   * Recursively compute the difference between snapshots under a given
+   * directory/file.
+   * @param snapshotRoot The directory where snapshots were taken.
+   * @param node The directory/file under which the diff is computed.
+   * @param parentPath Relative path (corresponding to the snapshot root) of
+   *                   the node's parent.
+   * @param diffReport data structure used to store the diff.
+   */
+  private void computeDiffRecursively(final INodeDirectory snapshotRoot,
+      INode node, List<byte[]> parentPath, SnapshotDiffInfo diffReport) {
+    final Snapshot earlierSnapshot = diffReport.isFromEarlier() ?
+        diffReport.getFrom() : diffReport.getTo();
+    final Snapshot laterSnapshot = diffReport.isFromEarlier() ?
+        diffReport.getTo() : diffReport.getFrom();
+    byte[][] relativePath = parentPath.toArray(new byte[parentPath.size()][]);
+    if (node.isDirectory()) {
+      final ChildrenDiff diff = new ChildrenDiff();
+      INodeDirectory dir = node.asDirectory();
+      DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
+      if (sf != null) {
+        boolean change = sf.computeDiffBetweenSnapshots(earlierSnapshot,
+            laterSnapshot, diff, dir);
+        if (change) {
+          diffReport.addDirDiff(dir, relativePath, diff);
+        }
+      }
+      ReadOnlyList<INode> children = dir.getChildrenList(earlierSnapshot
+          .getId());
+      for (INode child : children) {
+        final byte[] name = child.getLocalNameBytes();
+        boolean toProcess = diff.searchIndex(ListType.DELETED, name) < 0;
+        if (!toProcess && child instanceof INodeReference.WithName) {
+          byte[][] renameTargetPath = findRenameTargetPath(
+              snapshotRoot, (WithName) child,
+              laterSnapshot == null ? Snapshot.CURRENT_STATE_ID :
+                laterSnapshot.getId());
+          if (renameTargetPath != null) {
+            toProcess = true;
+            diffReport.setRenameTarget(child.getId(), renameTargetPath);
+          }
+        }
+        if (toProcess) {
+          parentPath.add(name);
+          computeDiffRecursively(snapshotRoot, child, parentPath, diffReport);
+          parentPath.remove(parentPath.size() - 1);
+        }
+      }
+    } else if (node.isFile() && node.asFile().isWithSnapshot()) {
+      INodeFile file = node.asFile();
+      boolean change = file.getFileWithSnapshotFeature()
+          .changedBetweenSnapshots(file, earlierSnapshot, laterSnapshot);
+      if (change) {
+        diffReport.addFileDiff(file, relativePath);
+      }
+    }
+  }
+
+  /**
+   * We just found a deleted WithName node as the source of a rename operation.
+   * However, we should include it in our snapshot diff report as rename only
+   * if the rename target is also under the same snapshottable directory.
+   */
+  private byte[][] findRenameTargetPath(final INodeDirectory snapshotRoot,
+      INodeReference.WithName wn, final int snapshotId) {
+    INode inode = wn.getReferredINode();
+    final LinkedList<byte[]> ancestors = Lists.newLinkedList();
+    while (inode != null) {
+      if (inode == snapshotRoot) {
+        return ancestors.toArray(new byte[ancestors.size()][]);
+      }
+      if (inode instanceof INodeReference.WithCount) {
+        inode = ((WithCount) inode).getParentRef(snapshotId);
+      } else {
+        INode parent = inode.getParentReference() != null ? inode
+            .getParentReference() : inode.getParent();
+        if (parent != null && parent instanceof INodeDirectory) {
+          int sid = parent.asDirectory().searchChild(inode);
+          if (sid < snapshotId) {
+            return null;
+          }
+        }
+        if (!(parent instanceof WithCount)) {
+          ancestors.addFirst(inode.getLocalNameBytes());
+        }
+        inode = parent;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return "snapshotsByNames=" + snapshotsByNames;
+  }
+
+  @VisibleForTesting
+  public void dumpTreeRecursively(INodeDirectory snapshotRoot, PrintWriter out,
+      StringBuilder prefix, int snapshot) {
+    if (snapshot == Snapshot.CURRENT_STATE_ID) {
+      out.println();
+      out.print(prefix);
+
+      out.print("Snapshot of ");
+      final String name = snapshotRoot.getLocalName();
+      out.print(name.isEmpty()? "/": name);
+      out.print(": quota=");
+      out.print(getSnapshotQuota());
+
+      int n = 0;
+      for(DirectoryDiff diff : getDiffs()) {
+        if (diff.isSnapshotRoot()) {
+          n++;
+        }
+      }
+      Preconditions.checkState(n == snapshotsByNames.size(), "#n=" + n
+          + ", snapshotsByNames.size()=" + snapshotsByNames.size());
+      out.print(", #snapshot=");
+      out.println(n);
+
+      INodeDirectory.dumpTreeRecursively(out, prefix,
+          new Iterable<SnapshotAndINode>() {
+        @Override
+        public Iterator<SnapshotAndINode> iterator() {
+          return new Iterator<SnapshotAndINode>() {
+            final Iterator<DirectoryDiff> i = getDiffs().iterator();
+            private DirectoryDiff next = findNext();
+
+            private DirectoryDiff findNext() {
+              for(; i.hasNext(); ) {
+                final DirectoryDiff diff = i.next();
+                if (diff.isSnapshotRoot()) {
+                  return diff;
+                }
+              }
+              return null;
+            }
+
+            @Override
+            public boolean hasNext() {
+              return next != null;
+            }
+
+            @Override
+            public SnapshotAndINode next() {
+              final SnapshotAndINode pair = new SnapshotAndINode(next
+                  .getSnapshotId(), getSnapshotById(next.getSnapshotId())
+                  .getRoot());
+              next = findNext();
+              return pair;
+            }
+
+            @Override
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index af3b0e1..9c9d435 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -48,7 +48,9 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import com.google.common.base.Preconditions;
 
 /**
- * Feature for directory with snapshot-related information.
+ * Feature used to store and process the snapshot diff information for a
+ * directory. In particular, it contains a directory diff list recording changes
+ * made to the directory and its children for each snapshot.
  */
 @InterfaceAudience.Private
 public class DirectoryWithSnapshotFeature implements INode.Feature {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index b36b4ab..3f4cda5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -127,9 +127,8 @@ public class FSImageFormatPBSnapshot {
     }
 
     /**
-     * Load the snapshots section from fsimage. Also convert snapshottable
-     * directories into {@link INodeDirectorySnapshottable}.
-     *
+     * Load the snapshots section from fsimage. Also add snapshottable feature
+     * to snapshottable directories.
      */
     public void loadSnapshotSection(InputStream in) throws IOException {
       SnapshotManager sm = fsn.getSnapshotManager();
@@ -139,16 +138,13 @@ public class FSImageFormatPBSnapshot {
       sm.setSnapshotCounter(section.getSnapshotCounter());
       for (long sdirId : section.getSnapshottableDirList()) {
         INodeDirectory dir = fsDir.getInode(sdirId).asDirectory();
-        final INodeDirectorySnapshottable sdir;
         if (!dir.isSnapshottable()) {
-          sdir = new INodeDirectorySnapshottable(dir);
-          fsDir.addToInodeMap(sdir);
+          dir.addSnapshottableFeature();
         } else {
           // dir is root, and admin set root to snapshottable before
-          sdir = (INodeDirectorySnapshottable) dir;
-          sdir.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+          dir.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
         }
-        sm.addSnapshottable(sdir);
+        sm.addSnapshottable(dir);
       }
       loadSnapshots(in, snum);
     }
@@ -160,12 +156,11 @@ public class FSImageFormatPBSnapshot {
         INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
             parent.getLoaderContext());
         int sid = pbs.getSnapshotId();
-        INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
-            .getInode(root.getId()).asDirectory();
+        INodeDirectory parent = fsDir.getInode(root.getId()).asDirectory();
         Snapshot snapshot = new Snapshot(sid, root, parent);
         // add the snapshot to parent, since we follow the sequence of
         // snapshotsByNames when saving, we do not need to sort when loading
-        parent.addSnapshot(snapshot);
+        parent.getDirectorySnapshottableFeature().addSnapshot(snapshot);
         snapshotMap.put(sid, snapshot);
       }
     }
@@ -373,14 +368,15 @@ public class FSImageFormatPBSnapshot {
           .setSnapshotCounter(sm.getSnapshotCounter())
           .setNumSnapshots(sm.getNumSnapshots());
 
-      INodeDirectorySnapshottable[] snapshottables = sm.getSnapshottableDirs();
-      for (INodeDirectorySnapshottable sdir : snapshottables) {
+      INodeDirectory[] snapshottables = sm.getSnapshottableDirs();
+      for (INodeDirectory sdir : snapshottables) {
         b.addSnapshottableDir(sdir.getId());
       }
       b.build().writeDelimitedTo(out);
       int i = 0;
-      for(INodeDirectorySnapshottable sdir : snapshottables) {
-        for(Snapshot s : sdir.getSnapshotsByNames()) {
+      for(INodeDirectory sdir : snapshottables) {
+        for (Snapshot s : sdir.getDirectorySnapshottableFeature()
+            .getSnapshotList()) {
           Root sroot = s.getRoot();
           SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
               .newBuilder().setSnapshotId(s.getId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
deleted file mode 100644
index 5a6ed75..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.snapshot;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.SnapshotException;
-import org.apache.hadoop.hdfs.server.namenode.Content;
-import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
-import org.apache.hadoop.hdfs.server.namenode.INodeFile;
-import org.apache.hadoop.hdfs.server.namenode.INodeMap;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
-import org.apache.hadoop.hdfs.server.namenode.Quota;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.ChildrenDiff;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
-import org.apache.hadoop.hdfs.util.Diff.ListType;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Directories where taking snapshots is allowed.
- * 
- * Like other {@link INode} subclasses, this class is synchronized externally
- * by the namesystem and FSDirectory locks.
- */
-@InterfaceAudience.Private
-public class INodeDirectorySnapshottable extends INodeDirectory {
-  /** Limit the number of snapshot per snapshottable directory. */
-  static final int SNAPSHOT_LIMIT = 1 << 16;
-
-  /** Cast INode to INodeDirectorySnapshottable. */
-  static public INodeDirectorySnapshottable valueOf(
-      INode inode, String src) throws IOException {
-    final INodeDirectory dir = INodeDirectory.valueOf(inode, src);
-    if (!dir.isSnapshottable()) {
-      throw new SnapshotException(
-          "Directory is not a snapshottable directory: " + src);
-    }
-    return (INodeDirectorySnapshottable)dir;
-  }
-
-  /**
-   * Snapshots of this directory in ascending order of snapshot names.
-   * Note that snapshots in ascending order of snapshot id are stored in
-   * {@link INodeDirectoryWithSnapshot}.diffs (a private field).
-   */
-  private final List<Snapshot> snapshotsByNames = new ArrayList<Snapshot>();
-
-  /**
-   * @return {@link #snapshotsByNames}
-   */
-  ReadOnlyList<Snapshot> getSnapshotsByNames() {
-    return ReadOnlyList.Util.asReadOnlyList(this.snapshotsByNames);
-  }
-  
-  /** Number of snapshots allowed. */
-  private int snapshotQuota = SNAPSHOT_LIMIT;
-
-  public INodeDirectorySnapshottable(INodeDirectory dir) {
-    super(dir, true, dir.getFeatures());
-    // add snapshot feature if the original directory does not have it
-    if (!isWithSnapshot()) {
-      addSnapshotFeature(null);
-    }
-  }
-  
-  /** @return the number of existing snapshots. */
-  public int getNumSnapshots() {
-    return snapshotsByNames.size();
-  }
-  
-  private int searchSnapshot(byte[] snapshotName) {
-    return Collections.binarySearch(snapshotsByNames, snapshotName);
-  }
-
-  /** @return the snapshot with the given name. */
-  public Snapshot getSnapshot(byte[] snapshotName) {
-    final int i = searchSnapshot(snapshotName);
-    return i < 0? null: snapshotsByNames.get(i);
-  }
-  
-  public Snapshot getSnapshotById(int sid) {
-    for (Snapshot s : snapshotsByNames) {
-      if (s.getId() == sid) {
-        return s;
-      }
-    }
-    return null;
-  }
-  
-  /** @return {@link #snapshotsByNames} as a {@link ReadOnlyList} */
-  public ReadOnlyList<Snapshot> getSnapshotList() {
-    return ReadOnlyList.Util.asReadOnlyList(snapshotsByNames);
-  }
-  
-  /**
-   * Rename a snapshot
-   * @param path
-   *          The directory path where the snapshot was taken. Used for
-   *          generating exception message.
-   * @param oldName
-   *          Old name of the snapshot
-   * @param newName
-   *          New name the snapshot will be renamed to
-   * @throws SnapshotException
-   *           Throw SnapshotException when either the snapshot with the old
-   *           name does not exist or a snapshot with the new name already
-   *           exists
-   */
-  void renameSnapshot(String path, String oldName, String newName)
-      throws SnapshotException {
-    if (newName.equals(oldName)) {
-      return;
-    }
-    final int indexOfOld = searchSnapshot(DFSUtil.string2Bytes(oldName));
-    if (indexOfOld < 0) {
-      throw new SnapshotException("The snapshot " + oldName
-          + " does not exist for directory " + path);
-    } else {
-      final byte[] newNameBytes = DFSUtil.string2Bytes(newName);
-      int indexOfNew = searchSnapshot(newNameBytes);
-      if (indexOfNew >= 0) {
-        throw new SnapshotException("The snapshot " + newName
-            + " already exists for directory " + path);
-      }
-      // remove the one with old name from snapshotsByNames
-      Snapshot snapshot = snapshotsByNames.remove(indexOfOld);
-      final INodeDirectory ssRoot = snapshot.getRoot();
-      ssRoot.setLocalName(newNameBytes);
-      indexOfNew = -indexOfNew - 1;
-      if (indexOfNew <= indexOfOld) {
-        snapshotsByNames.add(indexOfNew, snapshot);
-      } else { // indexOfNew > indexOfOld
-        snapshotsByNames.add(indexOfNew - 1, snapshot);
-      }
-    }
-  }
-
-  public int getSnapshotQuota() {
-    return snapshotQuota;
-  }
-
-  public void setSnapshotQuota(int snapshotQuota) {
-    if (snapshotQuota < 0) {
-      throw new HadoopIllegalArgumentException(
-          "Cannot set snapshot quota to " + snapshotQuota + " < 0");
-    }
-    this.snapshotQuota = snapshotQuota;
-  }
-
-  @Override
-  public boolean isSnapshottable() {
-    return true;
-  }
-  
-  /**
-   * Simply add a snapshot into the {@link #snapshotsByNames}. Used by FSImage
-   * loading.
-   */
-  void addSnapshot(Snapshot snapshot) {
-    this.snapshotsByNames.add(snapshot);
-  }
-
-  /** Add a snapshot. */
-  Snapshot addSnapshot(int id, String name) throws SnapshotException,
-      QuotaExceededException {
-    //check snapshot quota
-    final int n = getNumSnapshots();
-    if (n + 1 > snapshotQuota) {
-      throw new SnapshotException("Failed to add snapshot: there are already "
-          + n + " snapshot(s) and the snapshot quota is "
-          + snapshotQuota);
-    }
-    final Snapshot s = new Snapshot(id, name, this);
-    final byte[] nameBytes = s.getRoot().getLocalNameBytes();
-    final int i = searchSnapshot(nameBytes);
-    if (i >= 0) {
-      throw new SnapshotException("Failed to add snapshot: there is already a "
-          + "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\".");
-    }
-
-    final DirectoryDiff d = getDiffs().addDiff(id, this);
-    d.setSnapshotRoot(s.getRoot());
-    snapshotsByNames.add(-i - 1, s);
-
-    //set modification time
-    updateModificationTime(Time.now(), Snapshot.CURRENT_STATE_ID);
-    s.getRoot().setModificationTime(getModificationTime(),
-        Snapshot.CURRENT_STATE_ID);
-    return s;
-  }
-  
-  /**
-   * Remove the snapshot with the given name from {@link #snapshotsByNames},
-   * and delete all the corresponding DirectoryDiff.
-   * 
-   * @param snapshotName The name of the snapshot to be removed
-   * @param collectedBlocks Used to collect information to update blocksMap
-   * @return The removed snapshot. Null if no snapshot with the given name 
-   *         exists.
-   */
-  Snapshot removeSnapshot(String snapshotName,
-      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes)
-      throws SnapshotException {
-    final int i = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
-    if (i < 0) {
-      throw new SnapshotException("Cannot delete snapshot " + snapshotName
-          + " from path " + this.getFullPathName()
-          + ": the snapshot does not exist.");
-    } else {
-      final Snapshot snapshot = snapshotsByNames.get(i);
-      int prior = Snapshot.findLatestSnapshot(this, snapshot.getId());
-      try {
-        Quota.Counts counts = cleanSubtree(snapshot.getId(), prior,
-            collectedBlocks, removedINodes, true);
-        INodeDirectory parent = getParent();
-        if (parent != null) {
-          // there will not be any WithName node corresponding to the deleted 
-          // snapshot, thus only update the quota usage in the current tree
-          parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
-              -counts.get(Quota.DISKSPACE), true);
-        }
-      } catch(QuotaExceededException e) {
-        LOG.error("BUG: removeSnapshot increases namespace usage.", e);
-      }
-      // remove from snapshotsByNames after successfully cleaning the subtree
-      snapshotsByNames.remove(i);
-      return snapshot;
-    }
-  }
-  
-  @Override
-  public ContentSummaryComputationContext computeContentSummary(
-      final ContentSummaryComputationContext summary) {
-    super.computeContentSummary(summary);
-    summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
-    summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
-    return summary;
-  }
-
-  /**
-   * Compute the difference between two snapshots (or a snapshot and the current
-   * directory) of the directory.
-   * 
-   * @param from The name of the start point of the comparison. Null indicating
-   *          the current tree.
-   * @param to The name of the end point. Null indicating the current tree.
-   * @return The difference between the start/end points.
-   * @throws SnapshotException If there is no snapshot matching the starting
-   *           point, or if endSnapshotName is not null but cannot be identified
-   *           as a previous snapshot.
-   */
-  SnapshotDiffInfo computeDiff(final String from, final String to)
-      throws SnapshotException {
-    Snapshot fromSnapshot = getSnapshotByName(from);
-    Snapshot toSnapshot = getSnapshotByName(to);
-    // if the start point is equal to the end point, return null
-    if (from.equals(to)) {
-      return null;
-    }
-    SnapshotDiffInfo diffs = new SnapshotDiffInfo(this, fromSnapshot,
-        toSnapshot);
-    computeDiffRecursively(this, new ArrayList<byte[]>(), diffs);
-    return diffs;
-  }
-  
-  /**
-   * Find the snapshot matching the given name.
-   * 
-   * @param snapshotName The name of the snapshot.
-   * @return The corresponding snapshot. Null if snapshotName is null or empty.
-   * @throws SnapshotException If snapshotName is not null or empty, but there
-   *           is no snapshot matching the name.
-   */
-  private Snapshot getSnapshotByName(String snapshotName)
-      throws SnapshotException {
-    Snapshot s = null;
-    if (snapshotName != null && !snapshotName.isEmpty()) {
-      final int index = searchSnapshot(DFSUtil.string2Bytes(snapshotName));
-      if (index < 0) {
-        throw new SnapshotException("Cannot find the snapshot of directory "
-            + this.getFullPathName() + " with name " + snapshotName);
-      }
-      s = snapshotsByNames.get(index);
-    }
-    return s;
-  }
-  
-  /**
-   * Recursively compute the difference between snapshots under a given
-   * directory/file.
-   * @param node The directory/file under which the diff is computed. 
-   * @param parentPath Relative path (corresponding to the snapshot root) of 
-   *                   the node's parent.
-   * @param diffReport data structure used to store the diff.
-   */
-  private void computeDiffRecursively(INode node, List<byte[]> parentPath,
-      SnapshotDiffInfo diffReport) {
-    final Snapshot earlierSnapshot = diffReport.isFromEarlier() ?
-        diffReport.getFrom() : diffReport.getTo();
-    final Snapshot laterSnapshot = diffReport.isFromEarlier() ?
-        diffReport.getTo() : diffReport.getFrom();
-    byte[][] relativePath = parentPath.toArray(new byte[parentPath.size()][]);
-    if (node.isDirectory()) {
-      final ChildrenDiff diff = new ChildrenDiff();
-      INodeDirectory dir = node.asDirectory();
-      DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
-      if (sf != null) {
-        boolean change = sf.computeDiffBetweenSnapshots(earlierSnapshot,
-            laterSnapshot, diff, dir);
-        if (change) {
-          diffReport.addDirDiff(dir, relativePath, diff);
-        }
-      }
-      ReadOnlyList<INode> children = dir.getChildrenList(earlierSnapshot
-          .getId());
-      for (INode child : children) {
-        final byte[] name = child.getLocalNameBytes();
-        boolean toProcess = diff.searchIndex(ListType.DELETED, name) < 0;
-        if (!toProcess && child instanceof INodeReference.WithName) {
-          byte[][] renameTargetPath = findRenameTargetPath((WithName) child,
-              laterSnapshot == null ? Snapshot.CURRENT_STATE_ID : 
-                                      laterSnapshot.getId());
-          if (renameTargetPath != null) {
-            toProcess = true;
-            diffReport.setRenameTarget(child.getId(), renameTargetPath);
-          }
-        }
-        if (toProcess) {
-          parentPath.add(name);
-          computeDiffRecursively(child, parentPath, diffReport);
-          parentPath.remove(parentPath.size() - 1);
-        }
-      }
-    } else if (node.isFile() && node.asFile().isWithSnapshot()) {
-      INodeFile file = node.asFile();
-      boolean change = file.getFileWithSnapshotFeature()
-          .changedBetweenSnapshots(file, earlierSnapshot, laterSnapshot);
-      if (change) {
-        diffReport.addFileDiff(file, relativePath);
-      }
-    }
-  }
-
-  /**
-   * We just found a deleted WithName node as the source of a rename operation.
-   * However, we should include it in our snapshot diff report as rename only
-   * if the rename target is also under the same snapshottable directory.
-   */
-  private byte[][] findRenameTargetPath(INodeReference.WithName wn,
-      final int snapshotId) {
-    INode inode = wn.getReferredINode();
-    final LinkedList<byte[]> ancestors = Lists.newLinkedList();
-    while (inode != null) {
-      if (inode == this) {
-        return ancestors.toArray(new byte[ancestors.size()][]);
-      }
-      if (inode instanceof INodeReference.WithCount) {
-        inode = ((WithCount) inode).getParentRef(snapshotId);
-      } else {
-        INode parent = inode.getParentReference() != null ? inode
-            .getParentReference() : inode.getParent();
-        if (parent != null && parent instanceof INodeDirectory) {
-          int sid = parent.asDirectory().searchChild(inode);
-          if (sid < snapshotId) {
-            return null;
-          }
-        }
-        if (!(parent instanceof WithCount)) {
-          ancestors.addFirst(inode.getLocalNameBytes());
-        }
-        inode = parent;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Replace itself with {@link INodeDirectoryWithSnapshot} or
-   * {@link INodeDirectory} depending on the latest snapshot.
-   */
-  INodeDirectory replaceSelf(final int latestSnapshotId, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) {
-      Preconditions.checkState(getDirectoryWithSnapshotFeature()
-          .getLastSnapshotId() == Snapshot.CURRENT_STATE_ID, "this=%s", this);
-    }
-    INodeDirectory dir = replaceSelf4INodeDirectory(inodeMap);
-    if (latestSnapshotId != Snapshot.CURRENT_STATE_ID) {
-      dir.recordModification(latestSnapshotId);
-    }
-    return dir;
-  }
-
-  @Override
-  public String toDetailString() {
-    return super.toDetailString() + ", snapshotsByNames=" + snapshotsByNames;
-  }
-
-  @Override
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
-      int snapshot) {
-    super.dumpTreeRecursively(out, prefix, snapshot);
-
-    if (snapshot == Snapshot.CURRENT_STATE_ID) {
-      out.println();
-      out.print(prefix);
-
-      out.print("Snapshot of ");
-      final String name = getLocalName();
-      out.print(name.isEmpty()? "/": name);
-      out.print(": quota=");
-      out.print(getSnapshotQuota());
-
-      int n = 0;
-      for(DirectoryDiff diff : getDiffs()) {
-        if (diff.isSnapshotRoot()) {
-          n++;
-        }
-      }
-      Preconditions.checkState(n == snapshotsByNames.size(), "#n=" + n
-          + ", snapshotsByNames.size()=" + snapshotsByNames.size());
-      out.print(", #snapshot=");
-      out.println(n);
-
-      dumpTreeRecursively(out, prefix, new Iterable<SnapshotAndINode>() {
-        @Override
-        public Iterator<SnapshotAndINode> iterator() {
-          return new Iterator<SnapshotAndINode>() {
-            final Iterator<DirectoryDiff> i = getDiffs().iterator();
-            private DirectoryDiff next = findNext();
-  
-            private DirectoryDiff findNext() {
-              for(; i.hasNext(); ) {
-                final DirectoryDiff diff = i.next();
-                if (diff.isSnapshotRoot()) {
-                  return diff;
-                }
-              }
-              return null;
-            }
-
-            @Override
-            public boolean hasNext() {
-              return next != null;
-            }
-  
-            @Override
-            public SnapshotAndINode next() {
-              final SnapshotAndINode pair = new SnapshotAndINode(next
-                  .getSnapshotId(), getSnapshotById(next.getSnapshotId())
-                  .getRoot());
-              next = findNext();
-              return pair;
-            }
-  
-            @Override
-            public void remove() {
-              throw new UnsupportedOperationException();
-            }
-          };
-        }
-      });
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
index c1450e0..56d3418 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
@@ -184,15 +184,14 @@ public class Snapshot implements Comparable<byte[]> {
   /** The root directory of the snapshot. */
   private final Root root;
 
-  Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
+  Snapshot(int id, String name, INodeDirectory dir) {
     this(id, dir, dir);
     this.root.setLocalName(DFSUtil.string2Bytes(name));
   }
 
-  Snapshot(int id, INodeDirectory dir, INodeDirectorySnapshottable parent) {
+  Snapshot(int id, INodeDirectory dir, INodeDirectory parent) {
     this.id = id;
     this.root = new Root(dir);
-
     this.root.setParent(parent);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
index f709f87..197b8ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotDiffInfo.java
@@ -99,7 +99,7 @@ class SnapshotDiffInfo {
   }
 
   /** The root directory of the snapshots */
-  private final INodeDirectorySnapshottable snapshotRoot;
+  private final INodeDirectory snapshotRoot;
   /** The starting point of the difference */
   private final Snapshot from;
   /** The end point of the difference */
@@ -122,8 +122,8 @@ class SnapshotDiffInfo {
   private final Map<Long, RenameEntry> renameMap =
       new HashMap<Long, RenameEntry>();
 
-  SnapshotDiffInfo(INodeDirectorySnapshottable snapshotRoot, Snapshot start,
-      Snapshot end) {
+  SnapshotDiffInfo(INodeDirectory snapshotRoot, Snapshot start, Snapshot end) {
+    Preconditions.checkArgument(snapshotRoot.isSnapshottable());
     this.snapshotRoot = snapshotRoot;
     this.from = start;
     this.to = end;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b68818c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
index 0194898..fcab53a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A helper class defining static methods for reading/writing snapshot related
  * information from/to FSImage.
@@ -52,17 +54,19 @@ public class SnapshotFSImageFormat {
    * @param out The {@link DataOutput} to write.
    * @throws IOException
    */
-  public static void saveSnapshots(INodeDirectorySnapshottable current,
-      DataOutput out) throws IOException {
+  public static void saveSnapshots(INodeDirectory current, DataOutput out)
+      throws IOException {
+    DirectorySnapshottableFeature sf = current.getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     // list of snapshots in snapshotsByNames
-    ReadOnlyList<Snapshot> snapshots = current.getSnapshotsByNames();
+    ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
     out.writeInt(snapshots.size());
     for (Snapshot s : snapshots) {
       // write the snapshot id
       out.writeInt(s.getId());
     }
     // snapshot quota
-    out.writeInt(current.getSnapshotQuota());
+    out.writeInt(sf.getSnapshotQuota());
   }
 
   /**
@@ -216,19 +220,22 @@ public class SnapshotFSImageFormat {
    * @param loader
    *          The loader
    */
-  public static void loadSnapshotList(
-      INodeDirectorySnapshottable snapshottableParent, int numSnapshots,
-      DataInput in, FSImageFormat.Loader loader) throws IOException {
+  public static void loadSnapshotList(INodeDirectory snapshottableParent,
+      int numSnapshots, DataInput in, FSImageFormat.Loader loader)
+      throws IOException {
+    DirectorySnapshottableFeature sf = snapshottableParent
+        .getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     for (int i = 0; i < numSnapshots; i++) {
       // read snapshots
       final Snapshot s = loader.getSnapshot(in);
       s.getRoot().setParent(snapshottableParent);
-      snapshottableParent.addSnapshot(s);
+      sf.addSnapshot(s);
     }
     int snapshotQuota = in.readInt();
     snapshottableParent.setSnapshotQuota(snapshotQuota);
   }
-  
+
   /**
    * Load the {@link SnapshotDiff} list for the INodeDirectoryWithSnapshot
    * directory.


Mime
View raw message