hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject svn commit: r1556688 [2/5] - in /hadoop/common/branches/HDFS-5698/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/...
Date Thu, 09 Jan 2014 00:53:08 GMT
Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Jan  9 00:53:04 2014
@@ -614,14 +614,14 @@ public class FSDirectory implements Clos
     INode srcChild = srcIIP.getLastINode();
     final byte[] srcChildName = srcChild.getLocalNameBytes();
     final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshot());
+        srcIIP.getLatestSnapshotId());
     final boolean srcChildIsReference = srcChild.isReference();
     
     // Record the snapshot on srcChild. After the rename, before any new 
     // snapshot is taken on the dst tree, changes will be recorded in the latest
     // snapshot of the src tree.
     if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot());
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
       srcIIP.setLastINode(srcChild);
     }
     
@@ -629,17 +629,16 @@ public class FSDirectory implements Clos
     final INodeReference.WithCount withCount;
     Quota.Counts oldSrcCounts = Quota.Counts.newInstance();
     int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.INVALID_ID;
+        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
     if (isSrcInSnapshot) {
       final INodeReference.WithName withName = 
           srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
-              srcChild, srcIIP.getLatestSnapshot()); 
+              srcChild, srcIIP.getLatestSnapshotId()); 
       withCount = (INodeReference.WithCount) withName.getReferredINode();
       srcChild = withName;
       srcIIP.setLastINode(srcChild);
       // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true,
-          Snapshot.INVALID_ID);
+      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
     } else if (srcChildIsReference) {
       // srcChild is reference but srcChild is not in latest snapshot
       withCount = (WithCount) srcChild.asReference().getReferredINode();
@@ -675,10 +674,9 @@ public class FSDirectory implements Clos
         toDst = srcChild;
       } else {
         withCount.getReferredINode().setLocalName(dstChildName);
-        Snapshot dstSnapshot = dstIIP.getLatestSnapshot();
+        int dstSnapshotId = dstIIP.getLatestSnapshotId();
         final INodeReference.DstReference ref = new INodeReference.DstReference(
-            dstParent.asDirectory(), withCount,
-            dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
+            dstParent.asDirectory(), withCount, dstSnapshotId);
         toDst = ref;
       }
       
@@ -690,9 +688,9 @@ public class FSDirectory implements Clos
         }
         // update modification time of dst and the parent of src
         final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
         dstParent = dstIIP.getINode(-2); // refresh dstParent
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
         // update moved leases with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);     
 
@@ -700,7 +698,7 @@ public class FSDirectory implements Clos
         if (isSrcInSnapshot) {
           // get the counts after rename
           Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false, Snapshot.INVALID_ID);
+              Quota.Counts.newInstance(), false);
           newSrcCounts.subtract(oldSrcCounts);
           srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
               newSrcCounts.get(Quota.DISKSPACE), false);
@@ -732,8 +730,7 @@ public class FSDirectory implements Clos
         if (isSrcInSnapshot) {
           // srcParent must have snapshot feature since isSrcInSnapshot is true
           // and src node has been removed from srcParent 
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild,
-              srcIIP.getLatestSnapshot());
+          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
         } else {
           // original srcChild is not in latest snapshot, we only need to add
           // the srcChild back
@@ -836,7 +833,7 @@ public class FSDirectory implements Clos
       }
       if (dstInode.isDirectory()) {
         final ReadOnlyList<INode> children = dstInode.asDirectory()
-            .getChildrenList(null);
+            .getChildrenList(Snapshot.CURRENT_STATE_ID);
         if (!children.isEmpty()) {
           error = "rename destination directory is not empty: " + dst;
           NameNode.stateChangeLog.warn(
@@ -867,31 +864,30 @@ public class FSDirectory implements Clos
     INode srcChild = srcIIP.getLastINode();
     final byte[] srcChildName = srcChild.getLocalNameBytes();
     final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshot());
+        srcIIP.getLatestSnapshotId());
     final boolean srcChildIsReference = srcChild.isReference();
     
     // Record the snapshot on srcChild. After the rename, before any new 
     // snapshot is taken on the dst tree, changes will be recorded in the latest
     // snapshot of the src tree.
     if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot());
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
       srcIIP.setLastINode(srcChild);
     }
     
     // check srcChild for reference
     final INodeReference.WithCount withCount;
     int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.INVALID_ID;
+        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
     Quota.Counts oldSrcCounts = Quota.Counts.newInstance();    
     if (isSrcInSnapshot) {
       final INodeReference.WithName withName = srcIIP.getINode(-2).asDirectory()
-          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshot()); 
+          .replaceChild4ReferenceWithName(srcChild, srcIIP.getLatestSnapshotId()); 
       withCount = (INodeReference.WithCount) withName.getReferredINode();
       srcChild = withName;
       srcIIP.setLastINode(srcChild);
       // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true,
-          Snapshot.INVALID_ID);
+      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
     } else if (srcChildIsReference) {
       // srcChild is reference but srcChild is not in latest snapshot
       withCount = (WithCount) srcChild.asReference().getReferredINode();
@@ -935,10 +931,9 @@ public class FSDirectory implements Clos
         toDst = srcChild;
       } else {
         withCount.getReferredINode().setLocalName(dstChildName);
-        Snapshot dstSnapshot = dstIIP.getLatestSnapshot();
+        int dstSnapshotId = dstIIP.getLatestSnapshotId();
         final INodeReference.DstReference ref = new INodeReference.DstReference(
-            dstIIP.getINode(-2).asDirectory(), withCount,
-            dstSnapshot == null ? Snapshot.INVALID_ID : dstSnapshot.getId());
+            dstIIP.getINode(-2).asDirectory(), withCount, dstSnapshotId);
         toDst = ref;
       }
 
@@ -952,9 +947,9 @@ public class FSDirectory implements Clos
         }
 
         final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshotId());
         dstParent = dstIIP.getINode(-2);
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshotId());
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
 
@@ -964,8 +959,8 @@ public class FSDirectory implements Clos
           undoRemoveDst = false;
           BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
           List<INode> removedINodes = new ChunkedArrayList<INode>();
-          filesDeleted = removedDst.cleanSubtree(null,
-              dstIIP.getLatestSnapshot(), collectedBlocks, removedINodes, true)
+          filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+              dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes, true)
               .get(Quota.NAMESPACE);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
               removedINodes);
@@ -981,7 +976,7 @@ public class FSDirectory implements Clos
         if (isSrcInSnapshot) {
           // get the counts after rename
           Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false, Snapshot.INVALID_ID);
+              Quota.Counts.newInstance(), false);
           newSrcCounts.subtract(oldSrcCounts);
           srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
               newSrcCounts.get(Quota.DISKSPACE), false);
@@ -1012,8 +1007,7 @@ public class FSDirectory implements Clos
         }
         
         if (srcParent.isWithSnapshot()) {
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild,
-              srcIIP.getLatestSnapshot());
+          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
         } else {
           // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
           // the srcChild back
@@ -1024,7 +1018,7 @@ public class FSDirectory implements Clos
         // Rename failed - restore dst
         if (dstParent.isDirectory() && dstParent.asDirectory().isWithSnapshot()) {
           dstParent.asDirectory().undoRename4DstParent(removedDst,
-              dstIIP.getLatestSnapshot());
+              dstIIP.getLatestSnapshotId());
         } else {
           addLastINodeNoQuotaCheck(dstIIP, removedDst);
         }
@@ -1088,7 +1082,7 @@ public class FSDirectory implements Clos
       updateCount(iip, 0, dsDelta, true);
     }
 
-    file = file.setFileReplication(replication, iip.getLatestSnapshot(),
+    file = file.setFileReplication(replication, iip.getLatestSnapshotId(),
         inodeMap);
     
     final short newBR = file.getBlockReplication(); 
@@ -1155,7 +1149,7 @@ public class FSDirectory implements Clos
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
-    inode.setPermission(permissions, inodesInPath.getLatestSnapshot());
+    inode.setPermission(permissions, inodesInPath.getLatestSnapshotId());
   }
 
   void setOwner(String src, String username, String groupname)
@@ -1180,10 +1174,10 @@ public class FSDirectory implements Clos
       throw new FileNotFoundException("File does not exist: " + src);
     }
     if (username != null) {
-      inode = inode.setUser(username, inodesInPath.getLatestSnapshot());
+      inode = inode.setUser(username, inodesInPath.getLatestSnapshotId());
     }
     if (groupname != null) {
-      inode.setGroup(groupname, inodesInPath.getLatestSnapshot());
+      inode.setGroup(groupname, inodesInPath.getLatestSnapshotId());
     }
   }
 
@@ -1225,12 +1219,12 @@ public class FSDirectory implements Clos
     final INode[] trgINodes = trgIIP.getINodes();
     final INodeFile trgInode = trgIIP.getLastINode().asFile();
     INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
-    final Snapshot trgLatestSnapshot = trgIIP.getLatestSnapshot();
+    final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
     
     final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
     for(int i = 0; i < srcs.length; i++) {
       final INodesInPath iip = getINodesInPath4Write(srcs[i]);
-      final Snapshot latest = iip.getLatestSnapshot();
+      final int latest = iip.getLatestSnapshotId();
       final INode inode = iip.getLastINode();
 
       // check if the file in the latest snapshot
@@ -1354,7 +1348,7 @@ public class FSDirectory implements Clos
         //not found or not a directory
         return false;
       }
-      final Snapshot s = inodesInPath.getPathSnapshot();
+      final int s = inodesInPath.getPathSnapshotId();
       return !inode.asDirectory().getChildrenList(s).isEmpty();
     } finally {
       readUnlock();
@@ -1408,7 +1402,7 @@ public class FSDirectory implements Clos
     }
 
     // record modification
-    final Snapshot latestSnapshot = iip.getLatestSnapshot();
+    final int latestSnapshot = iip.getLatestSnapshotId();
     targetNode = targetNode.recordModification(latestSnapshot);
     iip.setLastINode(targetNode);
 
@@ -1429,8 +1423,8 @@ public class FSDirectory implements Clos
     if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
       targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
     } else {
-      Quota.Counts counts = targetNode.cleanSubtree(null, latestSnapshot,
-          collectedBlocks, removedINodes, true);
+      Quota.Counts counts = targetNode.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+          latestSnapshot, collectedBlocks, removedINodes, true);
       parent.addSpaceConsumed(-counts.get(Quota.NAMESPACE),
           -counts.get(Quota.DISKSPACE), true);
       removed = counts.get(Quota.NAMESPACE);
@@ -1467,7 +1461,7 @@ public class FSDirectory implements Clos
           }
         }
       } 
-      for (INode child : targetDir.getChildrenList(null)) {
+      for (INode child : targetDir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
         checkSnapshot(child, snapshottableDirs);
       }
     }
@@ -1491,7 +1485,7 @@ public class FSDirectory implements Clos
         return getSnapshotsListing(srcs, startAfter);
       }
       final INodesInPath inodesInPath = rootDir.getLastINodeInPath(srcs, true);
-      final Snapshot snapshot = inodesInPath.getPathSnapshot();
+      final int snapshot = inodesInPath.getPathSnapshotId();
       final INode targetNode = inodesInPath.getINode(0);
       if (targetNode == null)
         return null;
@@ -1543,7 +1537,8 @@ public class FSDirectory implements Clos
     final HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
     for (int i = 0; i < numOfListing; i++) {
       Root sRoot = snapshots.get(i + skipSize).getRoot();
-      listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot, null);
+      listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot,
+          Snapshot.CURRENT_STATE_ID);
     }
     return new DirectoryListing(
         listing, snapshots.size() - skipSize - numOfListing);
@@ -1566,7 +1561,7 @@ public class FSDirectory implements Clos
       final INodesInPath inodesInPath = rootDir.getLastINodeInPath(srcs, resolveLink);
       final INode i = inodesInPath.getINode(0);
       return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i,
-          inodesInPath.getPathSnapshot());
+          inodesInPath.getPathSnapshotId());
     } finally {
       readUnlock();
     }
@@ -2129,7 +2124,7 @@ public class FSDirectory implements Clos
     }
 
     final INodeDirectory parent = pathComponents[pos-1].asDirectory();
-    final int count = parent.getChildrenList(null).size();
+    final int count = parent.getChildrenList(Snapshot.CURRENT_STATE_ID).size();
     if (count >= maxDirItems) {
       final MaxDirectoryItemsExceededException e
           = new MaxDirectoryItemsExceededException(maxDirItems, count);
@@ -2193,7 +2188,7 @@ public class FSDirectory implements Clos
     final INodeDirectory parent = inodes[pos-1].asDirectory();
     boolean added = false;
     try {
-      added = parent.addChild(child, true, iip.getLatestSnapshot());
+      added = parent.addChild(child, true, iip.getLatestSnapshotId());
     } catch (QuotaExceededException e) {
       updateCountNoQuotaCheck(iip, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
@@ -2228,7 +2223,7 @@ public class FSDirectory implements Clos
    */
   private long removeLastINode(final INodesInPath iip)
       throws QuotaExceededException {
-    final Snapshot latestSnapshot = iip.getLatestSnapshot();
+    final int latestSnapshot = iip.getLatestSnapshotId();
     final INode last = iip.getLastINode();
     final INodeDirectory parent = iip.getINode(-2).asDirectory();
     if (!parent.removeChild(last, latestSnapshot)) {
@@ -2382,7 +2377,7 @@ public class FSDirectory implements Clos
         return null;
       }
 
-      final Snapshot latest = iip.getLatestSnapshot();
+      final int latest = iip.getLatestSnapshotId();
       dirNode = dirNode.recordModification(latest);
       dirNode.setQuota(nsQuota, dsQuota);
       return dirNode;
@@ -2425,11 +2420,11 @@ public class FSDirectory implements Clos
    * Sets the access time on the file/directory. Logs it in the transaction log.
    */
   void setTimes(String src, INode inode, long mtime, long atime, boolean force,
-      Snapshot latest) throws QuotaExceededException {
+      int latestSnapshotId) throws QuotaExceededException {
     boolean status = false;
     writeLock();
     try {
-      status = unprotectedSetTimes(inode, mtime, atime, force, latest);
+      status = unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
     } finally {
       writeUnlock();
     }
@@ -2443,11 +2438,11 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     final INodesInPath i = getLastINodeInPath(src); 
     return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
-        i.getLatestSnapshot());
+        i.getLatestSnapshotId());
   }
 
   private boolean unprotectedSetTimes(INode inode, long mtime,
-      long atime, boolean force, Snapshot latest) throws QuotaExceededException {
+      long atime, boolean force, int latest) throws QuotaExceededException {
     assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
@@ -2455,7 +2450,7 @@ public class FSDirectory implements Clos
       status = true;
     }
     if (atime != -1) {
-      long inodeTime = inode.getAccessTime(null);
+      long inodeTime = inode.getAccessTime();
 
       // if the last access time update was within the last precision interval, then
       // no need to store access time
@@ -2495,7 +2490,7 @@ public class FSDirectory implements Clos
    * @throws IOException if any error occurs
    */
   private HdfsFileStatus createFileStatus(byte[] path, INode node,
-      boolean needLocation, Snapshot snapshot) throws IOException {
+      boolean needLocation, int snapshot) throws IOException {
     if (needLocation) {
       return createLocatedFileStatus(path, node, snapshot);
     } else {
@@ -2506,7 +2501,7 @@ public class FSDirectory implements Clos
    * Create FileStatus by file INode 
    */
    HdfsFileStatus createFileStatus(byte[] path, INode node,
-       Snapshot snapshot) {
+       int snapshot) {
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
@@ -2539,7 +2534,7 @@ public class FSDirectory implements Clos
    * Create FileStatus with location info by file INode
    */
   private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path,
-      INode node, Snapshot snapshot) throws IOException {
+      INode node, int snapshot) throws IOException {
     assert hasReadLock();
     long size = 0; // length is zero for directories
     short replication = 0;
@@ -2551,7 +2546,7 @@ public class FSDirectory implements Clos
       replication = fileNode.getFileReplication(snapshot);
       blocksize = fileNode.getPreferredBlockSize();
 
-      final boolean inSnapshot = snapshot != null; 
+      final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID; 
       final boolean isUc = inSnapshot ? false : fileNode.isUnderConstruction();
       final long fileSize = !inSnapshot && isUc ? 
           fileNode.computeFileSizeNotIncludingLastUcBlock() : size;

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Jan  9 00:53:04 2014
@@ -24,14 +24,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.EnumMap;
-import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -79,6 +77,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
@@ -327,7 +326,7 @@ public class FSEditLogLoader {
         // add the op into retry cache if necessary
         if (toAddRetryCache) {
           HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
-              HdfsFileStatus.EMPTY_NAME, newFile, null);
+              HdfsFileStatus.EMPTY_NAME, newFile, Snapshot.CURRENT_STATE_ID);
           fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
               addCloseOp.rpcCallId, stat);
         }
@@ -340,7 +339,7 @@ public class FSEditLogLoader {
           }
           LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path,
               oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false, iip.getLatestSnapshot(), false);
+              false, iip.getLatestSnapshotId(), false);
           newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
               addCloseOp.path, true);
           
@@ -356,8 +355,8 @@ public class FSEditLogLoader {
       // update the block list.
       
       // Update the salient file attributes.
-      newFile.setAccessTime(addCloseOp.atime, null);
-      newFile.setModificationTime(addCloseOp.mtime, null);
+      newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
+      newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
       updateBlocks(fsDir, addCloseOp, newFile);
       break;
     }
@@ -375,8 +374,8 @@ public class FSEditLogLoader {
       final INodeFile file = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
 
       // Update the salient file attributes.
-      file.setAccessTime(addCloseOp.atime, null);
-      file.setModificationTime(addCloseOp.mtime, null);
+      file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
+      file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
       updateBlocks(fsDir, addCloseOp, file);
 
       // Now close the file

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Jan  9 00:53:04 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -405,60 +406,64 @@ public class FSImage implements Closeabl
     // Directories that don't have previous state do not rollback
     boolean canRollback = false;
     FSImage prevState = new FSImage(conf);
-    prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File prevDir = sd.getPreviousDir();
-      if (!prevDir.exists()) {  // use current directory then
-        LOG.info("Storage directory " + sd.getRoot()
-                 + " does not contain previous fs state.");
-        // read and verify consistency with other directories
-        storage.readProperties(sd);
-        continue;
-      }
+    try {
+      prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
+      for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        File prevDir = sd.getPreviousDir();
+        if (!prevDir.exists()) {  // use current directory then
+          LOG.info("Storage directory " + sd.getRoot()
+            + " does not contain previous fs state.");
+          // read and verify consistency with other directories
+          storage.readProperties(sd);
+          continue;
+        }
 
-      // read and verify consistency of the prev dir
-      prevState.getStorage().readPreviousVersionProperties(sd);
+        // read and verify consistency of the prev dir
+        prevState.getStorage().readPreviousVersionProperties(sd);
 
-      if (prevState.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
-        throw new IOException(
-          "Cannot rollback to storage version " +
-          prevState.getLayoutVersion() +
-          " using this version of the NameNode, which uses storage version " +
-          HdfsConstants.LAYOUT_VERSION + ". " +
-          "Please use the previous version of HDFS to perform the rollback.");
-      }
-      canRollback = true;
-    }
-    if (!canRollback)
-      throw new IOException("Cannot rollback. None of the storage "
-                            + "directories contain previous fs state.");
+        if (prevState.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
+          throw new IOException(
+            "Cannot rollback to storage version " +
+                prevState.getLayoutVersion() +
+                " using this version of the NameNode, which uses storage version " +
+                HdfsConstants.LAYOUT_VERSION + ". " +
+              "Please use the previous version of HDFS to perform the rollback.");
+        }
+        canRollback = true;
+      }
+      if (!canRollback)
+        throw new IOException("Cannot rollback. None of the storage "
+            + "directories contain previous fs state.");
+
+      // Now that we know all directories are going to be consistent
+      // Do rollback for each directory containing previous state
+      for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+        StorageDirectory sd = it.next();
+        File prevDir = sd.getPreviousDir();
+        if (!prevDir.exists())
+          continue;
 
-    // Now that we know all directories are going to be consistent
-    // Do rollback for each directory containing previous state
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File prevDir = sd.getPreviousDir();
-      if (!prevDir.exists())
-        continue;
+        LOG.info("Rolling back storage directory " + sd.getRoot()
+          + ".\n   new LV = " + prevState.getStorage().getLayoutVersion()
+          + "; new CTime = " + prevState.getStorage().getCTime());
+        File tmpDir = sd.getRemovedTmp();
+        assert !tmpDir.exists() : "removed.tmp directory must not exist.";
+        // rename current to tmp
+        File curDir = sd.getCurrentDir();
+        assert curDir.exists() : "Current directory must exist.";
+        NNStorage.rename(curDir, tmpDir);
+        // rename previous to current
+        NNStorage.rename(prevDir, curDir);
 
-      LOG.info("Rolling back storage directory " + sd.getRoot()
-               + ".\n   new LV = " + prevState.getStorage().getLayoutVersion()
-               + "; new CTime = " + prevState.getStorage().getCTime());
-      File tmpDir = sd.getRemovedTmp();
-      assert !tmpDir.exists() : "removed.tmp directory must not exist.";
-      // rename current to tmp
-      File curDir = sd.getCurrentDir();
-      assert curDir.exists() : "Current directory must exist.";
-      NNStorage.rename(curDir, tmpDir);
-      // rename previous to current
-      NNStorage.rename(prevDir, curDir);
-
-      // delete tmp dir
-      NNStorage.deleteDir(tmpDir);
-      LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
+        // delete tmp dir
+        NNStorage.deleteDir(tmpDir);
+        LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
+      }
+      isUpgradeFinalized = true;
+    } finally {
+      prevState.close();
     }
-    isUpgradeFinalized = true;
   }
 
   private void doFinalize(StorageDirectory sd) throws IOException {
@@ -766,7 +771,7 @@ public class FSImage implements Closeabl
 
     dir.computeQuotaUsage4CurrentDirectory(counts);
     
-    for (INode child : dir.getChildrenList(null)) {
+    for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
       if (child.isDirectory()) {
         updateCountForQuotaRecursively(child.asDirectory(), counts);
       } else {

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu Jan  9 00:53:04 2014
@@ -1111,7 +1111,8 @@ public class FSImageFormat {
         return;
       }
       
-      final ReadOnlyList<INode> children = current.getChildrenList(null);
+      final ReadOnlyList<INode> children = current
+          .getChildrenList(Snapshot.CURRENT_STATE_ID);
       int dirNum = 0;
       List<INodeDirectory> snapshotDirs = null;
       DirectoryWithSnapshotFeature sf = current.getDirectoryWithSnapshotFeature();

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jan  9 00:53:04 2014
@@ -176,7 +176,15 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@@ -929,7 +937,6 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       if (blockManager != null) blockManager.close();
-      cacheManager.deactivate();
     } finally {
       writeUnlock();
     }
@@ -999,7 +1006,7 @@ public class FSNamesystem implements Nam
           editLogRollerThreshold, editLogRollerInterval));
       nnEditLogRoller.start();
 
-      cacheManager.activate();
+      cacheManager.startMonitorThread();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
     } finally {
       writeUnlock();
@@ -1050,7 +1057,9 @@ public class FSNamesystem implements Nam
         // so that the tailer starts from the right spot.
         dir.fsImage.updateLastAppliedTxIdFromWritten();
       }
-      cacheManager.deactivate();
+      cacheManager.stopMonitorThread();
+      cacheManager.clearDirectiveStats();
+      blockManager.getDatanodeManager().clearPendingCachingCommands();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
     } finally {
       writeUnlock();
@@ -1616,11 +1625,11 @@ public class FSNamesystem implements Nam
             if (isReadOp) {
               continue;
             }
-            dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshot());
+            dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshotId());
           }
         }
         final long fileSize = iip.isSnapshot() ?
-            inode.computeFileSize(iip.getPathSnapshot())
+            inode.computeFileSize(iip.getPathSnapshotId())
             : inode.computeFileSizeNotIncludingLastUcBlock();
         boolean isUc = inode.isUnderConstruction();
         if (iip.isSnapshot()) {
@@ -1868,7 +1877,7 @@ public class FSNamesystem implements Nam
       final INodesInPath iip = dir.getINodesInPath4Write(src);
       final INode inode = iip.getLastINode();
       if (inode != null) {
-        dir.setTimes(src, inode, mtime, atime, true, iip.getLatestSnapshot());
+        dir.setTimes(src, inode, mtime, atime, true, iip.getLatestSnapshotId());
         resultingStat = getAuditFileInfo(src, false);
       } else {
         throw new FileNotFoundException("File/Directory " + src + " does not exist.");
@@ -2264,7 +2273,7 @@ public class FSNamesystem implements Nam
       final DatanodeDescriptor clientNode = 
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
-          true, iip.getLatestSnapshot(), logRetryCache);
+          true, iip.getLatestSnapshotId(), logRetryCache);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       throw ie;
@@ -2289,7 +2298,7 @@ public class FSNamesystem implements Nam
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
+      boolean writeToEditLog, int latestSnapshot, boolean logRetryCache)
       throws IOException {
     file = file.recordModification(latestSnapshot);
     final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine,
@@ -2924,7 +2933,7 @@ public class FSNamesystem implements Nam
     }
 
     finalizeINodeFileUnderConstruction(src, pendingFile,
-        iip.getLatestSnapshot());
+        iip.getLatestSnapshotId());
     return true;
   }
 
@@ -3633,7 +3642,7 @@ public class FSNamesystem implements Nam
     // then reap lease immediately and close the file.
     if(nrCompleteBlocks == nrBlocks) {
       finalizeINodeFileUnderConstruction(src, pendingFile,
-          iip.getLatestSnapshot());
+          iip.getLatestSnapshotId());
       NameNode.stateChangeLog.warn("BLOCK*"
         + " internalReleaseLease: All existing blocks are COMPLETE,"
         + " lease removed, file closed.");
@@ -3682,7 +3691,7 @@ public class FSNamesystem implements Nam
       if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            iip.getLatestSnapshot());
+            iip.getLatestSnapshotId());
         NameNode.stateChangeLog.warn("BLOCK*"
           + " internalReleaseLease: Committed blocks are minimally replicated,"
           + " lease removed, file closed.");
@@ -3713,7 +3722,7 @@ public class FSNamesystem implements Nam
         // We can remove this block and close the file.
         pendingFile.removeLastBlock(lastBlock);
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            iip.getLatestSnapshot());
+            iip.getLatestSnapshotId());
         NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
             + "Removed empty last block and closed file.");
         return true;
@@ -3774,7 +3783,7 @@ public class FSNamesystem implements Nam
   }
 
   private void finalizeINodeFileUnderConstruction(String src,
-      INodeFile pendingFile, Snapshot latestSnapshot) throws IOException,
+      INodeFile pendingFile, int latestSnapshot) throws IOException,
       UnresolvedLinkException {
     assert hasWriteLock();
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
@@ -3974,7 +3983,7 @@ public class FSNamesystem implements Nam
 
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.findLatestSnapshot(pendingFile, null));
+        Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID));
 
     return src;
   }
@@ -7064,6 +7073,9 @@ public class FSNamesystem implements Nam
       return (Long) cacheEntry.getPayload();
     }
     boolean success = false;
+    if (!flags.contains(CacheFlag.FORCE)) {
+      cacheManager.waitForRescanIfNeeded();
+    }
     writeLock();
     Long result = null;
     try {
@@ -7105,6 +7117,9 @@ public class FSNamesystem implements Nam
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
+    if (!flags.contains(CacheFlag.FORCE)) {
+      cacheManager.waitForRescanIfNeeded();
+    }
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -7122,7 +7137,7 @@ public class FSNamesystem implements Nam
         getEditLog().logSync();
       }
       if (isAuditEnabled() && isExternalInvocation()) {
-        logAuditEvent(success, "addCacheDirective", null, null, null);
+        logAuditEvent(success, "modifyCacheDirective", null, null, null);
       }
       RetryCache.setState(cacheEntry, success);
     }
@@ -7164,6 +7179,7 @@ public class FSNamesystem implements Nam
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
     BatchedListEntries<CacheDirectiveEntry> results;
+    cacheManager.waitForRescanIfNeeded();
     readLock();
     boolean success = false;
     try {
@@ -7287,6 +7303,7 @@ public class FSNamesystem implements Nam
     BatchedListEntries<CachePoolEntry> results;
     checkOperation(OperationCategory.READ);
     boolean success = false;
+    cacheManager.waitForRescanIfNeeded();
     readLock();
     try {
       checkOperation(OperationCategory.READ);

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Thu Jan  9 00:53:04 2014
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -29,7 +28,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -142,54 +140,54 @@ class FSPermissionChecker {
     // check if (parentAccess != null) && file exists, then check sb
     // If resolveLink, the check is performed on the link target.
     final INodesInPath inodesInPath = root.getINodesInPath(path, resolveLink);
-    final Snapshot snapshot = inodesInPath.getPathSnapshot();
+    final int snapshotId = inodesInPath.getPathSnapshotId();
     final INode[] inodes = inodesInPath.getINodes();
     int ancestorIndex = inodes.length - 2;
     for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
         ancestorIndex--);
-    checkTraverse(inodes, ancestorIndex, snapshot);
+    checkTraverse(inodes, ancestorIndex, snapshotId);
 
     final INode last = inodes[inodes.length - 1];
     if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
         && inodes.length > 1 && last != null) {
-      checkStickyBit(inodes[inodes.length - 2], last, snapshot);
+      checkStickyBit(inodes[inodes.length - 2], last, snapshotId);
     }
     if (ancestorAccess != null && inodes.length > 1) {
-      check(inodes, ancestorIndex, snapshot, ancestorAccess);
+      check(inodes, ancestorIndex, snapshotId, ancestorAccess);
     }
     if (parentAccess != null && inodes.length > 1) {
-      check(inodes, inodes.length - 2, snapshot, parentAccess);
+      check(inodes, inodes.length - 2, snapshotId, parentAccess);
     }
     if (access != null) {
-      check(last, snapshot, access);
+      check(last, snapshotId, access);
     }
     if (subAccess != null) {
-      checkSubAccess(last, snapshot, subAccess);
+      checkSubAccess(last, snapshotId, subAccess);
     }
     if (doCheckOwner) {
-      checkOwner(last, snapshot);
+      checkOwner(last, snapshotId);
     }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkOwner(INode inode, Snapshot snapshot
+  private void checkOwner(INode inode, int snapshotId
       ) throws AccessControlException {
-    if (inode != null && user.equals(inode.getUserName(snapshot))) {
+    if (inode != null && user.equals(inode.getUserName(snapshotId))) {
       return;
     }
     throw new AccessControlException("Permission denied");
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INode[] inodes, int last, Snapshot snapshot
+  private void checkTraverse(INode[] inodes, int last, int snapshotId
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
-      check(inodes[j], snapshot, FsAction.EXECUTE);
+      check(inodes[j], snapshotId, FsAction.EXECUTE);
     }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkSubAccess(INode inode, Snapshot snapshot, FsAction access
+  private void checkSubAccess(INode inode, int snapshotId, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
       return;
@@ -198,9 +196,9 @@ class FSPermissionChecker {
     Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
     for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
       INodeDirectory d = directories.pop();
-      check(d, snapshot, access);
+      check(d, snapshotId, access);
 
-      for(INode child : d.getChildrenList(snapshot)) {
+      for(INode child : d.getChildrenList(snapshotId)) {
         if (child.isDirectory()) {
           directories.push(child.asDirectory());
         }
@@ -209,23 +207,23 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode[] inodes, int i, Snapshot snapshot, FsAction access
+  private void check(INode[] inodes, int i, int snapshotId, FsAction access
       ) throws AccessControlException {
-    check(i >= 0? inodes[i]: null, snapshot, access);
+    check(i >= 0? inodes[i]: null, snapshotId, access);
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode inode, Snapshot snapshot, FsAction access
+  private void check(INode inode, int snapshotId, FsAction access
       ) throws AccessControlException {
     if (inode == null) {
       return;
     }
-    FsPermission mode = inode.getFsPermission(snapshot);
+    FsPermission mode = inode.getFsPermission(snapshotId);
 
-    if (user.equals(inode.getUserName(snapshot))) { //user class
+    if (user.equals(inode.getUserName(snapshotId))) { //user class
       if (mode.getUserAction().implies(access)) { return; }
     }
-    else if (groups.contains(inode.getGroupName(snapshot))) { //group class
+    else if (groups.contains(inode.getGroupName(snapshotId))) { //group class
       if (mode.getGroupAction().implies(access)) { return; }
     }
     else { //other class
@@ -236,19 +234,19 @@ class FSPermissionChecker {
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkStickyBit(INode parent, INode inode, Snapshot snapshot
+  private void checkStickyBit(INode parent, INode inode, int snapshotId
       ) throws AccessControlException {
-    if(!parent.getFsPermission(snapshot).getStickyBit()) {
+    if(!parent.getFsPermission(snapshotId).getStickyBit()) {
       return;
     }
 
     // If this user is the directory owner, return
-    if(parent.getUserName(snapshot).equals(user)) {
+    if(parent.getUserName(snapshotId).equals(user)) {
       return;
     }
 
     // if this user is the file owner, return
-    if(inode.getUserName(snapshot).equals(user)) {
+    if(inode.getUserName(snapshotId).equals(user)) {
       return;
     }
 

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu Jan  9 00:53:04 2014
@@ -70,98 +70,101 @@ public abstract class INode implements I
   }
 
   /** Get the {@link PermissionStatus} */
-  abstract PermissionStatus getPermissionStatus(Snapshot snapshot);
+  abstract PermissionStatus getPermissionStatus(int snapshotId);
 
   /** The same as getPermissionStatus(null). */
   final PermissionStatus getPermissionStatus() {
-    return getPermissionStatus(null);
+    return getPermissionStatus(Snapshot.CURRENT_STATE_ID);
   }
 
   /**
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current inode.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the given snapshot; otherwise, get the result from the
+   *          current inode.
    * @return user name
    */
-  abstract String getUserName(Snapshot snapshot);
+  abstract String getUserName(int snapshotId);
 
-  /** The same as getUserName(null). */
+  /** The same as getUserName(Snapshot.CURRENT_STATE_ID). */
   @Override
   public final String getUserName() {
-    return getUserName(null);
+    return getUserName(Snapshot.CURRENT_STATE_ID);
   }
 
   /** Set user */
   abstract void setUser(String user);
 
   /** Set user */
-  final INode setUser(String user, Snapshot latest)
+  final INode setUser(String user, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latest);
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
     nodeToUpdate.setUser(user);
     return nodeToUpdate;
   }
   /**
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current inode.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the given snapshot; otherwise, get the result from the
+   *          current inode.
    * @return group name
    */
-  abstract String getGroupName(Snapshot snapshot);
+  abstract String getGroupName(int snapshotId);
 
-  /** The same as getGroupName(null). */
+  /** The same as getGroupName(Snapshot.CURRENT_STATE_ID). */
   @Override
   public final String getGroupName() {
-    return getGroupName(null);
+    return getGroupName(Snapshot.CURRENT_STATE_ID);
   }
 
   /** Set group */
   abstract void setGroup(String group);
 
   /** Set group */
-  final INode setGroup(String group, Snapshot latest)
+  final INode setGroup(String group, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latest);
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
     nodeToUpdate.setGroup(group);
     return nodeToUpdate;
   }
 
   /**
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current inode.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the given snapshot; otherwise, get the result from the
+   *          current inode.
    * @return permission.
    */
-  abstract FsPermission getFsPermission(Snapshot snapshot);
+  abstract FsPermission getFsPermission(int snapshotId);
   
-  /** The same as getFsPermission(null). */
+  /** The same as getFsPermission(Snapshot.CURRENT_STATE_ID). */
   @Override
   public final FsPermission getFsPermission() {
-    return getFsPermission(null);
+    return getFsPermission(Snapshot.CURRENT_STATE_ID);
   }
 
   /** Set the {@link FsPermission} of this {@link INode} */
   abstract void setPermission(FsPermission permission);
 
   /** Set the {@link FsPermission} of this {@link INode} */
-  INode setPermission(FsPermission permission, Snapshot latest) 
+  INode setPermission(FsPermission permission, int latestSnapshotId) 
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latest);
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
     nodeToUpdate.setPermission(permission);
     return nodeToUpdate;
   }
 
   /**
-   * @return if the given snapshot is null, return this;
-   *     otherwise return the corresponding snapshot inode.
+   * @return if the given snapshot id is {@link Snapshot#CURRENT_STATE_ID},
+   *         return this; otherwise return the corresponding snapshot inode.
    */
-  public INodeAttributes getSnapshotINode(final Snapshot snapshot) {
+  public INodeAttributes getSnapshotINode(final int snapshotId) {
     return this;
   }
 
   /** Is this inode in the latest snapshot? */
-  public final boolean isInLatestSnapshot(final Snapshot latest) {
-    if (latest == null) {
+  public final boolean isInLatestSnapshot(final int latestSnapshotId) {
+    if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) {
       return false;
     }
     // if parent is a reference node, parent must be a renamed node. We can 
@@ -173,10 +176,11 @@ public abstract class INode implements I
     if (parentDir == null) { // root
       return true;
     }
-    if (!parentDir.isInLatestSnapshot(latest)) {
+    if (!parentDir.isInLatestSnapshot(latestSnapshotId)) {
       return false;
     }
-    final INode child = parentDir.getChild(getLocalNameBytes(), latest);
+    final INode child = parentDir.getChild(getLocalNameBytes(),
+        latestSnapshotId);
     if (this == child) {
       return true;
     }
@@ -203,21 +207,22 @@ public abstract class INode implements I
    * operation, or the snapshot belonging to the DST tree.
    * 
    * @param latestInDst
-   *          the latest snapshot in the DST tree above the reference node
+   *          id of the latest snapshot in the DST tree above the reference node
    * @return True: the modification should be recorded in the snapshot that
    *         belongs to the SRC tree. False: the modification should be
    *         recorded in the snapshot that belongs to the DST tree.
    */
-  public final boolean shouldRecordInSrcSnapshot(final Snapshot latestInDst) {
+  public final boolean shouldRecordInSrcSnapshot(final int latestInDst) {
     Preconditions.checkState(!isReference());
 
-    if (latestInDst == null) {
+    if (latestInDst == Snapshot.CURRENT_STATE_ID) {
       return true;
     }
     INodeReference withCount = getParentReference();
     if (withCount != null) {
       int dstSnapshotId = withCount.getParentReference().getDstSnapshotId();
-      if (dstSnapshotId >= latestInDst.getId()) {
+      if (dstSnapshotId != Snapshot.CURRENT_STATE_ID
+          && dstSnapshotId >= latestInDst) {
         return true;
       }
     }
@@ -228,13 +233,14 @@ public abstract class INode implements I
    * This inode is being modified.  The previous version of the inode needs to
    * be recorded in the latest snapshot.
    *
-   * @param latest the latest snapshot that has been taken.
-   *        Note that it is null if no snapshots have been taken.
+   * @param latestSnapshotId The id of the latest snapshot that has been taken.
+   *                         Note that it is {@link Snapshot#CURRENT_STATE_ID} 
+   *                         if no snapshots have been taken.
    * @return The current inode, which usually is the same object of this inode.
    *         However, in some cases, this inode may be replaced with a new inode
    *         for maintaining snapshots. The current inode is then the new inode.
    */
-  abstract INode recordModification(final Snapshot latest)
+  abstract INode recordModification(final int latestSnapshotId)
       throws QuotaExceededException;
 
   /** Check whether it's a reference. */
@@ -330,12 +336,13 @@ public abstract class INode implements I
    * snapshot in its diff list. Recursively clean its children.
    * </pre>
    * 
-   * @param snapshot
-   *          The snapshot to delete. Null means to delete the current
+   * @param snapshotId
+   *          The id of the snapshot to delete. 
+   *          {@link Snapshot#CURRENT_STATE_ID} means to delete the current
    *          file/directory.
-   * @param prior
-   *          The latest snapshot before the to-be-deleted snapshot. When
-   *          deleting a current inode, this parameter captures the latest
+   * @param priorSnapshotId
+   *          The id of the latest snapshot before the to-be-deleted snapshot.
+   *          When deleting a current inode, this parameter captures the latest
    *          snapshot.
    * @param collectedBlocks
    *          blocks collected from the descents for further block
@@ -345,8 +352,8 @@ public abstract class INode implements I
    *          inodeMap
    * @return quota usage delta when deleting a snapshot
    */
-  public abstract Quota.Counts cleanSubtree(final Snapshot snapshot,
-      Snapshot prior, BlocksMapUpdateInfo collectedBlocks,
+  public abstract Quota.Counts cleanSubtree(final int snapshotId,
+      int priorSnapshotId, BlocksMapUpdateInfo collectedBlocks,
       List<INode> removedINodes, boolean countDiffChange)
       throws QuotaExceededException;
   
@@ -460,9 +467,10 @@ public abstract class INode implements I
    * @param counts The subtree counts for returning.
    * @param useCache Whether to use cached quota usage. Note that 
    *                 {@link WithName} node never uses cache for its subtree.
-   * @param lastSnapshotId {@link Snapshot#INVALID_ID} indicates the computation
-   *                       is in the current tree. Otherwise the id indicates
-   *                       the computation range for a {@link WithName} node.
+   * @param lastSnapshotId {@link Snapshot#CURRENT_STATE_ID} indicates the 
+   *                       computation is in the current tree. Otherwise the id
+   *                       indicates the computation range for a 
+   *                       {@link WithName} node.
    * @return The same objects as the counts parameter.
    */
   public abstract Quota.Counts computeQuotaUsage(Quota.Counts counts,
@@ -470,7 +478,7 @@ public abstract class INode implements I
 
   public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
       boolean useCache) {
-    return computeQuotaUsage(counts, useCache, Snapshot.INVALID_ID);
+    return computeQuotaUsage(counts, useCache, Snapshot.CURRENT_STATE_ID);
   }
   
   /**
@@ -558,21 +566,22 @@ public abstract class INode implements I
   }
 
   /**
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current inode.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the given snapshot; otherwise, get the result from the
+   *          current inode.
    * @return modification time.
    */
-  abstract long getModificationTime(Snapshot snapshot);
+  abstract long getModificationTime(int snapshotId);
 
-  /** The same as getModificationTime(null). */
+  /** The same as getModificationTime(Snapshot.CURRENT_STATE_ID). */
   @Override
   public final long getModificationTime() {
-    return getModificationTime(null);
+    return getModificationTime(Snapshot.CURRENT_STATE_ID);
   }
 
   /** Update modification time if it is larger than the current value. */
-  public abstract INode updateModificationTime(long mtime, Snapshot latest) 
+  public abstract INode updateModificationTime(long mtime, int latestSnapshotId) 
       throws QuotaExceededException;
 
   /** Set the last modification time of inode. */
@@ -580,24 +589,25 @@ public abstract class INode implements I
 
   /** Set the last modification time of inode. */
   public final INode setModificationTime(long modificationTime,
-      Snapshot latest) throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latest);
+      int latestSnapshotId) throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
     nodeToUpdate.setModificationTime(modificationTime);
     return nodeToUpdate;
   }
 
   /**
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current inode.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the given snapshot; otherwise, get the result from the
+   *          current inode.
    * @return access time
    */
-  abstract long getAccessTime(Snapshot snapshot);
+  abstract long getAccessTime(int snapshotId);
 
-  /** The same as getAccessTime(null). */
+  /** The same as getAccessTime(Snapshot.CURRENT_STATE_ID). */
   @Override
   public final long getAccessTime() {
-    return getAccessTime(null);
+    return getAccessTime(Snapshot.CURRENT_STATE_ID);
   }
 
   /**
@@ -608,9 +618,9 @@ public abstract class INode implements I
   /**
    * Set last access time of inode.
    */
-  public final INode setAccessTime(long accessTime, Snapshot latest)
+  public final INode setAccessTime(long accessTime, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latest);
+    final INode nodeToUpdate = recordModification(latestSnapshotId);
     nodeToUpdate.setAccessTime(accessTime);
     return nodeToUpdate;
   }
@@ -679,13 +689,15 @@ public abstract class INode implements I
   @VisibleForTesting
   public final StringBuffer dumpTreeRecursively() {
     final StringWriter out = new StringWriter(); 
-    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(), null);
+    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(),
+        Snapshot.CURRENT_STATE_ID);
     return out.getBuffer();
   }
 
   @VisibleForTesting
   public final void dumpTreeRecursively(PrintStream out) {
-    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(), null);
+    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(),
+        Snapshot.CURRENT_STATE_ID);
   }
 
   /**
@@ -694,7 +706,7 @@ public abstract class INode implements I
    */
   @VisibleForTesting
   public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
-      Snapshot snapshot) {
+      int snapshotId) {
     out.print(prefix);
     out.print(" ");
     final String name = getLocalName();
@@ -703,7 +715,7 @@ public abstract class INode implements I
     out.print(getObjectString());
     out.print("), ");
     out.print(getParentString());
-    out.print(", " + getPermissionStatus(snapshot));
+    out.print(", " + getPermissionStatus(snapshotId));
   }
   
   /**

Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1556688&r1=1556687&r2=1556688&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Thu Jan  9 00:53:04 2014
@@ -204,9 +204,9 @@ public class INodeDirectory extends INod
   }
   
   @Override
-  public INodeDirectoryAttributes getSnapshotINode(Snapshot snapshot) {
+  public INodeDirectoryAttributes getSnapshotINode(int snapshotId) {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
-    return sf == null ? this : sf.getDiffs().getSnapshotINode(snapshot, this);
+    return sf == null ? this : sf.getDiffs().getSnapshotINode(snapshotId, this);
   }
   
   @Override
@@ -217,12 +217,13 @@ public class INodeDirectory extends INod
 
   /** Replace itself with an {@link INodeDirectorySnapshottable}. */
   public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
-      Snapshot latest, final INodeMap inodeMap) throws QuotaExceededException {
+      int latestSnapshotId, final INodeMap inodeMap)
+      throws QuotaExceededException {
     Preconditions.checkState(!(this instanceof INodeDirectorySnapshottable),
         "this is already an INodeDirectorySnapshottable, this=%s", this);
     final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(this);
     replaceSelf(s, inodeMap).getDirectoryWithSnapshotFeature().getDiffs()
-        .saveSelf2Snapshot(latest, s, this);
+        .saveSelf2Snapshot(latestSnapshotId, s, this);
     return s;
   }
 
@@ -289,8 +290,8 @@ public class INodeDirectory extends INod
   }
 
   INodeReference.WithName replaceChild4ReferenceWithName(INode oldChild,
-      Snapshot latest) {
-    Preconditions.checkArgument(latest != null);
+      int latestSnapshotId) {
+    Preconditions.checkArgument(latestSnapshotId != Snapshot.CURRENT_STATE_ID);
     if (oldChild instanceof INodeReference.WithName) {
       return (INodeReference.WithName)oldChild;
     }
@@ -304,22 +305,23 @@ public class INodeDirectory extends INod
       withCount = new INodeReference.WithCount(null, oldChild);
     }
     final INodeReference.WithName ref = new INodeReference.WithName(this,
-        withCount, oldChild.getLocalNameBytes(), latest.getId());
+        withCount, oldChild.getLocalNameBytes(), latestSnapshotId);
     replaceChild(oldChild, ref, null);
     return ref;
   }
 
   @Override
-  public INodeDirectory recordModification(Snapshot latest) 
+  public INodeDirectory recordModification(int latestSnapshotId) 
       throws QuotaExceededException {
-    if (isInLatestSnapshot(latest) && !shouldRecordInSrcSnapshot(latest)) {
+    if (isInLatestSnapshot(latestSnapshotId)
+        && !shouldRecordInSrcSnapshot(latestSnapshotId)) {
       // add snapshot feature if necessary
       DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
       if (sf == null) {
         sf = addSnapshotFeature(null);
       }
       // record self in the diff list if necessary
-      sf.getDiffs().saveSelf2Snapshot(latest, this, null);
+      sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
     }
     return this;
   }
@@ -329,9 +331,9 @@ public class INodeDirectory extends INod
    * 
    * @return the child inode, which may be replaced.
    */
-  public INode saveChild2Snapshot(final INode child, final Snapshot latest,
+  public INode saveChild2Snapshot(final INode child, final int latestSnapshotId,
       final INode snapshotCopy) throws QuotaExceededException {
-    if (latest == null) {
+    if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) {
       return child;
     }
     
@@ -340,42 +342,45 @@ public class INodeDirectory extends INod
     if (sf == null) {
       sf = this.addSnapshotFeature(null);
     }
-    return sf.saveChild2Snapshot(this, child, latest, snapshotCopy);
+    return sf.saveChild2Snapshot(this, child, latestSnapshotId, snapshotCopy);
   }
 
   /**
    * @param name the name of the child
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current directory.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the corresponding snapshot; otherwise, get the result from
+   *          the current directory.
    * @return the child inode.
    */
-  public INode getChild(byte[] name, Snapshot snapshot) {
+  public INode getChild(byte[] name, int snapshotId) {
     DirectoryWithSnapshotFeature sf;
-    if (snapshot == null || (sf = getDirectoryWithSnapshotFeature()) == null) {
+    if (snapshotId == Snapshot.CURRENT_STATE_ID || 
+        (sf = getDirectoryWithSnapshotFeature()) == null) {
       ReadOnlyList<INode> c = getCurrentChildrenList();
       final int i = ReadOnlyList.Util.binarySearch(c, name);
       return i < 0 ? null : c.get(i);
     }
     
-    return sf.getChild(this, name, snapshot);
+    return sf.getChild(this, name, snapshotId);
   }
   
   /**
-   * @param snapshot
-   *          if it is not null, get the result from the given snapshot;
-   *          otherwise, get the result from the current directory.
+   * @param snapshotId
+   *          if it is not {@link Snapshot#CURRENT_STATE_ID}, get the result
+   *          from the corresponding snapshot; otherwise, get the result from
+   *          the current directory.
    * @return the current children list if the specified snapshot is null;
    *         otherwise, return the children list corresponding to the snapshot.
    *         Note that the returned list is never null.
    */
-  public ReadOnlyList<INode> getChildrenList(final Snapshot snapshot) {
+  public ReadOnlyList<INode> getChildrenList(final int snapshotId) {
     DirectoryWithSnapshotFeature sf;
-    if (snapshot == null
+    if (snapshotId == Snapshot.CURRENT_STATE_ID
         || (sf = this.getDirectoryWithSnapshotFeature()) == null) {
       return getCurrentChildrenList();
     }
-    return sf.getChildrenList(this, snapshot);
+    return sf.getChildrenList(this, snapshotId);
   }
   
   private ReadOnlyList<INode> getCurrentChildrenList() {
@@ -450,15 +455,15 @@ public class INodeDirectory extends INod
   /**
    * Remove the specified child from this directory.
    */
-  public boolean removeChild(INode child, Snapshot latest)
+  public boolean removeChild(INode child, int latestSnapshotId)
       throws QuotaExceededException {
-    if (isInLatestSnapshot(latest)) {
+    if (isInLatestSnapshot(latestSnapshotId)) {
       // create snapshot feature if necessary
       DirectoryWithSnapshotFeature sf = this.getDirectoryWithSnapshotFeature();
       if (sf == null) {
         sf = this.addSnapshotFeature(null);
       }
-      return sf.removeChild(this, child, latest);
+      return sf.removeChild(this, child, latestSnapshotId);
     }
     return removeChild(child);
   }
@@ -493,24 +498,24 @@ public class INodeDirectory extends INod
    *         otherwise, return true;
    */
   public boolean addChild(INode node, final boolean setModTime,
-      final Snapshot latest) throws QuotaExceededException {
+      final int latestSnapshotId) throws QuotaExceededException {
     final int low = searchChildren(node.getLocalNameBytes());
     if (low >= 0) {
       return false;
     }
 
-    if (isInLatestSnapshot(latest)) {
+    if (isInLatestSnapshot(latestSnapshotId)) {
       // create snapshot feature if necessary
       DirectoryWithSnapshotFeature sf = this.getDirectoryWithSnapshotFeature();
       if (sf == null) {
         sf = this.addSnapshotFeature(null);
       }
-      return sf.addChild(this, node, setModTime, latest);
+      return sf.addChild(this, node, setModTime, latestSnapshotId);
     }
     addChild(node, low);
     if (setModTime) {
       // update modification time of the parent directory
-      updateModificationTime(node.getModificationTime(), latest);
+      updateModificationTime(node.getModificationTime(), latestSnapshotId);
     }
     return true;
   }
@@ -548,10 +553,9 @@ public class INodeDirectory extends INod
     // we are computing the quota usage for a specific snapshot here, i.e., the
     // computation only includes files/directories that exist at the time of the
     // given snapshot
-    if (sf != null && lastSnapshotId != Snapshot.INVALID_ID
+    if (sf != null && lastSnapshotId != Snapshot.CURRENT_STATE_ID
         && !(useCache && isQuotaSet())) {
-      Snapshot lastSnapshot = sf.getDiffs().getSnapshotById(lastSnapshotId);
-      ReadOnlyList<INode> childrenList = getChildrenList(lastSnapshot);
+      ReadOnlyList<INode> childrenList = getChildrenList(lastSnapshotId);
       for (INode child : childrenList) {
         child.computeQuotaUsage(counts, useCache, lastSnapshotId);
       }
@@ -607,7 +611,7 @@ public class INodeDirectory extends INod
 
   ContentSummaryComputationContext computeDirectoryContentSummary(
       ContentSummaryComputationContext summary) {
-    ReadOnlyList<INode> childrenList = getChildrenList(null);
+    ReadOnlyList<INode> childrenList = getChildrenList(Snapshot.CURRENT_STATE_ID);
     // Explicit traversing is done to enable repositioning after relinquishing
     // and reacquiring locks.
     for (int i = 0;  i < childrenList.size(); i++) {
@@ -629,7 +633,7 @@ public class INodeDirectory extends INod
         break;
       }
       // Obtain the children list again since it may have been modified.
-      childrenList = getChildrenList(null);
+      childrenList = getChildrenList(Snapshot.CURRENT_STATE_ID);
       // Reposition in case the children list is changed. Decrement by 1
       // since it will be incremented when loops.
       i = nextChild(childrenList, childName) - 1;
@@ -668,21 +672,16 @@ public class INodeDirectory extends INod
    *          The reference node to be removed/replaced
    * @param newChild
    *          The node to be added back
-   * @param latestSnapshot
-   *          The latest snapshot. Note this may not be the last snapshot in the
-   *          diff list, since the src tree of the current rename operation
-   *          may be the dst tree of a previous rename.
    * @throws QuotaExceededException should not throw this exception
    */
   public void undoRename4ScrParent(final INodeReference oldChild,
-      final INode newChild, Snapshot latestSnapshot)
-      throws QuotaExceededException {
+      final INode newChild) throws QuotaExceededException {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     Preconditions.checkState(sf != null,
         "Directory does not have snapshot feature");
     sf.getDiffs().removeChild(ListType.DELETED, oldChild);
     sf.getDiffs().replaceChild(ListType.CREATED, oldChild, newChild);
-    addChild(newChild, true, null);
+    addChild(newChild, true, Snapshot.CURRENT_STATE_ID);
   }
   
   /**
@@ -691,16 +690,14 @@ public class INodeDirectory extends INod
    * and delete possible record in the deleted list.  
    */
   public void undoRename4DstParent(final INode deletedChild,
-      Snapshot latestSnapshot) throws QuotaExceededException {
+      int latestSnapshotId) throws QuotaExceededException {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     Preconditions.checkState(sf != null,
         "Directory does not have snapshot feature");
     boolean removeDeletedChild = sf.getDiffs().removeChild(ListType.DELETED,
         deletedChild);
-    // pass null for inodeMap since the parent node will not get replaced when
-    // undoing rename
-    final boolean added = addChild(deletedChild, true, removeDeletedChild ? null
-        : latestSnapshot);
+    int sid = removeDeletedChild ? Snapshot.CURRENT_STATE_ID : latestSnapshotId;
+    final boolean added = addChild(deletedChild, true, sid);
     // update quota usage if adding is successfully and the old child has not
     // been stored in deleted list before
     if (added && !removeDeletedChild) {
@@ -722,8 +719,8 @@ public class INodeDirectory extends INod
   }
 
   /** Call cleanSubtree(..) recursively down the subtree. */
-  public Quota.Counts cleanSubtreeRecursively(final Snapshot snapshot,
-      Snapshot prior, final BlocksMapUpdateInfo collectedBlocks,
+  public Quota.Counts cleanSubtreeRecursively(final int snapshot,
+      int prior, final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes, final Map<INode, INode> excludedNodes, 
       final boolean countDiffChange) throws QuotaExceededException {
     Quota.Counts counts = Quota.Counts.newInstance();
@@ -732,9 +729,10 @@ public class INodeDirectory extends INod
     // to its latest previous snapshot. (besides, we also need to consider nodes
     // created after prior but before snapshot. this will be done in 
     // DirectoryWithSnapshotFeature)
-    Snapshot s = snapshot != null && prior != null ? prior : snapshot;
+    int s = snapshot != Snapshot.CURRENT_STATE_ID
+        && prior != Snapshot.NO_SNAPSHOT_ID ? prior : snapshot;
     for (INode child : getChildrenList(s)) {
-      if (snapshot != null && excludedNodes != null
+      if (snapshot != Snapshot.CURRENT_STATE_ID && excludedNodes != null
           && excludedNodes.containsKey(child)) {
         continue;
       } else {
@@ -753,7 +751,7 @@ public class INodeDirectory extends INod
     if (sf != null) {
       sf.clear(this, collectedBlocks, removedINodes);
     }
-    for (INode child : getChildrenList(null)) {
+    for (INode child : getChildrenList(Snapshot.CURRENT_STATE_ID)) {
       child.destroyAndCollectBlocks(collectedBlocks, removedINodes);
     }
     clear();
@@ -761,18 +759,19 @@ public class INodeDirectory extends INod
   }
   
   @Override
-  public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
+  public Quota.Counts cleanSubtree(final int snapshotId, int priorSnapshotId,
       final BlocksMapUpdateInfo collectedBlocks,
       final List<INode> removedINodes, final boolean countDiffChange)
       throws QuotaExceededException {
     DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     // there is snapshot data
     if (sf != null) {
-      return sf.cleanDirectory(this, snapshot, prior, collectedBlocks,
-          removedINodes, countDiffChange);
+      return sf.cleanDirectory(this, snapshotId, priorSnapshotId,
+          collectedBlocks, removedINodes, countDiffChange);
     }
     // there is no snapshot data
-    if (prior == null && snapshot == null) {
+    if (priorSnapshotId == Snapshot.NO_SNAPSHOT_ID
+        && snapshotId == Snapshot.CURRENT_STATE_ID) {
       // destroy the whole subtree and collect blocks that should be deleted
       Quota.Counts counts = Quota.Counts.newInstance();
       this.computeQuotaUsage(counts, true);
@@ -780,7 +779,7 @@ public class INodeDirectory extends INod
       return counts; 
     } else {
       // process recursively down the subtree
-      Quota.Counts counts = cleanSubtreeRecursively(snapshot, prior,
+      Quota.Counts counts = cleanSubtreeRecursively(snapshotId, priorSnapshotId,
           collectedBlocks, removedINodes, null, countDiffChange);
       if (isQuotaSet()) {
         getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(
@@ -816,7 +815,7 @@ public class INodeDirectory extends INod
   @VisibleForTesting
   @Override
   public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
-      final Snapshot snapshot) {
+      final int snapshot) {
     super.dumpTreeRecursively(out, prefix, snapshot);
     out.print(", childrenSize=" + getChildrenList(snapshot).size());
     final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
@@ -824,7 +823,7 @@ public class INodeDirectory extends INod
       out.print(", " + q);
     }
     if (this instanceof Snapshot.Root) {
-      out.print(", snapshotId=" + snapshot.getId());
+      out.print(", snapshotId=" + snapshot);
     }
     out.println();
 
@@ -869,7 +868,7 @@ public class INodeDirectory extends INod
       for(final Iterator<SnapshotAndINode> i = subs.iterator(); i.hasNext();) {
         final SnapshotAndINode pair = i.next();
         prefix.append(i.hasNext()? DUMPTREE_EXCEPT_LAST_ITEM: DUMPTREE_LAST_ITEM);
-        pair.inode.dumpTreeRecursively(out, prefix, pair.snapshot);
+        pair.inode.dumpTreeRecursively(out, prefix, pair.snapshotId);
         prefix.setLength(prefix.length() - 2);
       }
     }
@@ -877,20 +876,16 @@ public class INodeDirectory extends INod
 
   /** A pair of Snapshot and INode objects. */
   protected static class SnapshotAndINode {
-    public final Snapshot snapshot;
+    public final int snapshotId;
     public final INode inode;
 
-    public SnapshotAndINode(Snapshot snapshot, INode inode) {
-      this.snapshot = snapshot;
+    public SnapshotAndINode(int snapshot, INode inode) {
+      this.snapshotId = snapshot;
       this.inode = inode;
     }
-
-    public SnapshotAndINode(Snapshot snapshot) {
-      this(snapshot, snapshot.getRoot());
-    }
   }
 
-  public final int getChildrenNum(final Snapshot snapshot) {
-    return getChildrenList(snapshot).size();
+  public final int getChildrenNum(final int snapshotId) {
+    return getChildrenList(snapshotId).size();
   }
 }



Mime
View raw message