hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1480838 [5/7] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/...
Date Thu, 09 May 2013 23:55:52 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu May  9 23:55:49 2013
@@ -33,8 +33,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
@@ -75,6 +75,7 @@ import static org.apache.hadoop.util.Tim
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -147,6 +148,9 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
@@ -167,7 +171,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
@@ -176,6 +179,11 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -333,6 +341,7 @@ public class FSNamesystem implements Nam
   /** The namespace tree. */
   FSDirectory dir;
   private final BlockManager blockManager;
+  private final SnapshotManager snapshotManager;
   private final DatanodeStatistics datanodeStatistics;
 
   // Block pool ID used by this namenode
@@ -622,6 +631,7 @@ public class FSNamesystem implements Nam
 
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
+      this.snapshotManager = new SnapshotManager(dir);
       this.safeMode = new SafeModeInfo(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -1382,21 +1392,25 @@ public class FSNamesystem implements Nam
           doAccessTime = false;
         }
 
-        long now = now();
-        final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
-        if (doAccessTime && isAccessTimeSupported()) {
+        final INodesInPath iip = dir.getLastINodeInPath(src);
+        final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
+        if (!iip.isSnapshot() //snapshots are readonly, so don't update atime.
+            && doAccessTime && isAccessTimeSupported()) {
+          final long now = now();
           if (now > inode.getAccessTime() + getAccessTimePrecision()) {
             // if we have to set access time but we only have the readlock, then
             // restart this entire operation with the writeLock.
             if (isReadOp) {
               continue;
             }
-            dir.setTimes(src, inode, -1, now, false);
+            dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshot());
           }
         }
-        return blockManager.createLocatedBlocks(inode.getBlocks(),
-            inode.computeFileSize(false), inode.isUnderConstruction(),
-            offset, length, needBlockToken);
+        final long fileSize = iip.getPathSnapshot() != null?
+            inode.computeFileSize(iip.getPathSnapshot())
+            : inode.computeFileSizeNotIncludingLastUcBlock();
+        return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
+            inode.isUnderConstruction(), offset, length, needBlockToken);
       } finally {
         if (isReadOp) {
           readUnlock();
@@ -1494,7 +1508,8 @@ public class FSNamesystem implements Nam
     // replication and blocks sizes should be the same for ALL the blocks
 
     // check the target
-    final INodeFile trgInode = INodeFile.valueOf(dir.getINode(target), target);
+    final INodeFile trgInode = INodeFile.valueOf(dir.getINode4Write(target),
+        target);
     if(trgInode.isUnderConstruction()) {
       throw new HadoopIllegalArgumentException("concat: target file "
           + target + " is under construction");
@@ -1504,6 +1519,10 @@ public class FSNamesystem implements Nam
       throw new HadoopIllegalArgumentException("concat: target file "
           + target + " is empty");
     }
+    if (trgInode instanceof INodeFileWithSnapshot) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is in a snapshot");
+    }
 
     long blockSize = trgInode.getPreferredBlockSize();
 
@@ -1516,7 +1535,7 @@ public class FSNamesystem implements Nam
     }
 
     si.add(trgInode);
-    short repl = trgInode.getBlockReplication();
+    final short repl = trgInode.getFileReplication();
 
     // now check the srcs
     boolean endSrc = false; // final src file doesn't have to have full end block
@@ -1525,7 +1544,7 @@ public class FSNamesystem implements Nam
       if(i==srcs.length-1)
         endSrc=true;
 
-      final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src);
+      final INodeFile srcInode = INodeFile.valueOf(dir.getINode4Write(src), src);
       if(src.isEmpty() 
           || srcInode.isUnderConstruction()
           || srcInode.numBlocks() == 0) {
@@ -1612,9 +1631,10 @@ public class FSNamesystem implements Nam
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.WRITE);
       }
-      INode inode = dir.getINode(src);
+      final INodesInPath iip = dir.getINodesInPath4Write(src);
+      final INode inode = iip.getLastINode();
       if (inode != null) {
-        dir.setTimes(src, inode, mtime, atime, true);
+        dir.setTimes(src, inode, mtime, atime, true, iip.getLatestSnapshot());
         resultingStat = getAuditFileInfo(src, false);
       } else {
         throw new FileNotFoundException("File/Directory " + src + " does not exist.");
@@ -1727,11 +1747,11 @@ public class FSNamesystem implements Nam
         checkPathAccess(pc, src, FsAction.WRITE);
       }
 
-      final short[] oldReplication = new short[1];
-      final Block[] blocks = dir.setReplication(src, replication, oldReplication);
+      final short[] blockRepls = new short[2]; // 0: old, 1: new
+      final Block[] blocks = dir.setReplication(src, replication, blockRepls);
       isFile = blocks != null;
       if (isFile) {
-        blockManager.setReplication(oldReplication[0], replication, src, blocks);
+        blockManager.setReplication(blockRepls[0], blockRepls[1], src, blocks);
       }
     } finally {
       writeUnlock();
@@ -1881,16 +1901,18 @@ public class FSNamesystem implements Nam
       ParentNotDirectoryException, IOException {
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
-    boolean pathExists = dir.exists(src);
-    if (pathExists && dir.isDir(src)) {
+    final INodesInPath iip = dir.getINodesInPath4Write(src);
+    final INode inode = iip.getLastINode();
+    if (inode != null && inode.isDirectory()) {
       throw new FileAlreadyExistsException("Cannot create file " + src
           + "; already exists as a directory.");
     }
+    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
 
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
     boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
-      if (append || (overwrite && pathExists)) {
+      if (append || (overwrite && myFile != null)) {
         checkPathAccess(pc, src, FsAction.WRITE);
       } else {
         checkAncestorAccess(pc, src, FsAction.WRITE);
@@ -1904,7 +1926,7 @@ public class FSNamesystem implements Nam
     try {
       blockManager.verifyReplication(src, replication, clientMachine);
       boolean create = flag.contains(CreateFlag.CREATE);
-      final INode myFile = dir.getINode(src);
+      
       if (myFile == null) {
         if (!create) {
           throw new FileNotFoundException("failed to overwrite or append to non-existent file "
@@ -1931,8 +1953,8 @@ public class FSNamesystem implements Nam
 
       if (append && myFile != null) {
         final INodeFile f = INodeFile.valueOf(myFile, src); 
-        return prepareFileForWrite(
-            src, f, holder, clientMachine, clientNode, true);
+        return prepareFileForWrite(src, f, holder, clientMachine, clientNode,
+            true, iip.getLatestSnapshot());
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1979,19 +2001,12 @@ public class FSNamesystem implements Nam
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog) throws IOException {
-    INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                    file.getId(),
-                                    file.getLocalNameBytes(),
-                                    file.getBlockReplication(),
-                                    file.getModificationTime(),
-                                    file.getPreferredBlockSize(),
-                                    file.getBlocks(),
-                                    file.getPermissionStatus(),
-                                    leaseHolder,
-                                    clientMachine,
-                                    clientNode);
-    dir.replaceNode(src, file, cons);
+      boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
+    file = file.recordModification(latestSnapshot, dir.getINodeMap());
+    final INodeFileUnderConstruction cons = file.toUnderConstruction(
+        leaseHolder, clientMachine, clientNode);
+
+    dir.replaceINodeFile(src, file, cons);
     leaseManager.addLease(cons.getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
@@ -2053,7 +2068,7 @@ public class FSNamesystem implements Nam
     return false;
   }
 
-  private void recoverLeaseInternal(INode fileInode, 
+  private void recoverLeaseInternal(INodeFile fileInode, 
       String src, String holder, String clientMachine, boolean force)
       throws IOException {
     assert hasWriteLock();
@@ -2265,7 +2280,7 @@ public class FSNamesystem implements Nam
       }
       blockSize = pendingFile.getPreferredBlockSize();
       clientNode = pendingFile.getClientNode();
-      replication = pendingFile.getBlockReplication();
+      replication = pendingFile.getFileReplication();
     } finally {
       readUnlock();
     }
@@ -2305,7 +2320,7 @@ public class FSNamesystem implements Nam
       saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
       dir.persistBlocks(src, pendingFile);
-      offset = pendingFile.computeFileSize(true);
+      offset = pendingFile.computeFileSize();
     } finally {
       writeUnlock();
     }
@@ -2336,11 +2351,9 @@ public class FSNamesystem implements Nam
     checkFsObjectLimit();
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    final INodesInPath inodesInPath =
-        dir.rootDir.getExistingPathINodes(src, true);
-    final INode[] inodes = inodesInPath.getINodes();
+    final INodesInPath iip = dir.getINodesInPath4Write(src);
     final INodeFileUnderConstruction pendingFile
-        = checkLease(src, fileId, clientName, inodes[inodes.length - 1]);
+        = checkLease(src, fileId, clientName, iip.getLastINode());
     BlockInfo lastBlockInFile = pendingFile.getLastBlock();
     if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
       // The block that the client claims is the current last block
@@ -2394,11 +2407,11 @@ public class FSNamesystem implements Nam
         NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
             "caught retry for allocation of a new block in " +
             src + ". Returning previously allocated block " + lastBlockInFile);
-        long offset = pendingFile.computeFileSize(true);
+        long offset = pendingFile.computeFileSize();
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
             offset);
-        return inodesInPath;
+        return iip;
       } else {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -2411,7 +2424,7 @@ public class FSNamesystem implements Nam
     if (!checkFileProgress(pendingFile, false)) {
       throw new NotReplicatedYetException("Not replicated yet: " + src);
     }
-    return inodesInPath;
+    return iip;
   }
 
   LocatedBlock makeLocatedBlock(Block blk,
@@ -2515,26 +2528,26 @@ public class FSNamesystem implements Nam
     return true;
   }
   
-  // make sure that we still have the lease on this file.
+  /** make sure that we still have the lease on this file. */
   private INodeFileUnderConstruction checkLease(String src, String holder)
       throws LeaseExpiredException, UnresolvedLinkException,
       FileNotFoundException {
-    assert hasReadOrWriteLock();
     return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
         dir.getINode(src));
   }
   
   private INodeFileUnderConstruction checkLease(String src, long fileId,
-      String holder, INode file) throws LeaseExpiredException,
+      String holder, INode inode) throws LeaseExpiredException,
       FileNotFoundException {
     assert hasReadOrWriteLock();
-    if (file == null || !(file instanceof INodeFile)) {
+    if (inode == null || !inode.isFile()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException(
           "No lease on " + src + ": File does not exist. "
           + (lease != null ? lease.toString()
               : "Holder " + holder + " does not have any open files."));
     }
+    final INodeFile file = inode.asFile();
     if (!file.isUnderConstruction()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException(
@@ -2589,19 +2602,23 @@ public class FSNamesystem implements Nam
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
     assert hasWriteLock();
-    INodeFileUnderConstruction pendingFile;
+    final INodesInPath iip = dir.getLastINodeInPath(src);
+    final INodeFileUnderConstruction pendingFile;
     try {
-      pendingFile = checkLease(src, holder);
+      pendingFile = checkLease(src, INodeId.GRANDFATHER_INODE_ID,
+          holder, iip.getINode(0)); 
     } catch (LeaseExpiredException lee) {
       final INode inode = dir.getINode(src);
-      if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) {
+      if (inode != null
+          && inode.isFile()
+          && !inode.asFile().isUnderConstruction()) {
         // This could be a retry RPC - i.e the client tried to close
         // the file, but missed the RPC response. Thus, it is trying
         // again to close the file. If the file still exists and
         // the client's view of the last block matches the actual
         // last block, then we'll treat it as a successful close.
         // See HDFS-3031.
-        final Block realLastBlock = ((INodeFile)inode).getLastBlock();
+        final Block realLastBlock = inode.asFile().getLastBlock();
         if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
           NameNode.stateChangeLog.info("DIR* completeFile: " +
               "request from " + holder + " to complete " + src +
@@ -2619,7 +2636,8 @@ public class FSNamesystem implements Nam
       return false;
     }
 
-    finalizeINodeFileUnderConstruction(src, pendingFile);
+    finalizeINodeFileUnderConstruction(src, pendingFile,
+        iip.getLatestSnapshot());
     return true;
   }
 
@@ -2878,6 +2896,7 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+    List<INode> removedINodes = new ArrayList<INode>();
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
@@ -2895,7 +2914,7 @@ public class FSNamesystem implements Nam
         checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
       // Unlink the target directory from directory tree
-      if (!dir.delete(src, collectedBlocks)) {
+      if (!dir.delete(src, collectedBlocks, removedINodes)) {
         return false;
       }
     } finally {
@@ -2904,6 +2923,8 @@ public class FSNamesystem implements Nam
     getEditLog().logSync(); 
     removeBlocks(collectedBlocks); // Incremental deletion of blocks
     collectedBlocks.clear();
+    dir.removeFromInodeMap(removedINodes);
+    removedINodes.clear();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
         + src +" is removed");
@@ -2920,7 +2941,7 @@ public class FSNamesystem implements Nam
    *          An instance of {@link BlocksMapUpdateInfo} which contains a list
    *          of blocks that need to be removed from blocksMap
    */
-  private void removeBlocks(BlocksMapUpdateInfo blocks) {
+  void removeBlocks(BlocksMapUpdateInfo blocks) {
     int start = 0;
     int end = 0;
     List<Block> toDeleteList = blocks.getToDeleteList();
@@ -2940,13 +2961,21 @@ public class FSNamesystem implements Nam
   }
   
   /**
-   * Remove leases and blocks related to a given path
+   * Remove leases, inodes and blocks related to a given path
    * @param src The given path
    * @param blocks Containing the list of blocks to be deleted from blocksMap
+   * @param removedINodes Containing the list of inodes to be removed from 
+   *                      inodesMap
    */
-  void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks) {
+  void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks,
+      List<INode> removedINodes) {
     assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
+    // remove inodes from inodesMap
+    if (removedINodes != null) {
+      dir.removeFromInodeMap(removedINodes);
+      removedINodes.clear();
+    }
     if (blocks == null) {
       return;
     }
@@ -3120,7 +3149,7 @@ public class FSNamesystem implements Nam
     if (isPermissionEnabled) {
       checkTraverse(pc, src);
     }
-    if (dir.isDir(src)) {
+    if (dir.isDirMutable(src)) {
       // all the users of mkdirs() are used to expect 'true' even if
       // a new directory is not created.
       return true;
@@ -3184,7 +3213,7 @@ public class FSNamesystem implements Nam
     }
     getEditLog().logSync();
   }
-  
+
   /** Persist all metadata about this file.
    * @param src The string representation of the path
    * @param clientName The string representation of the client
@@ -3236,8 +3265,9 @@ public class FSNamesystem implements Nam
     assert !isInSafeMode();
     assert hasWriteLock();
 
+    final INodesInPath iip = dir.getLastINodeInPath(src);
     final INodeFileUnderConstruction pendingFile
-        = INodeFileUnderConstruction.valueOf(dir.getINode(src), src);
+        = INodeFileUnderConstruction.valueOf(iip.getINode(0), src);
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
@@ -3254,7 +3284,8 @@ public class FSNamesystem implements Nam
     // If there are no incomplete blocks associated with this file,
     // then reap lease immediately and close the file.
     if(nrCompleteBlocks == nrBlocks) {
-      finalizeINodeFileUnderConstruction(src, pendingFile);
+      finalizeINodeFileUnderConstruction(src, pendingFile,
+          iip.getLatestSnapshot());
       NameNode.stateChangeLog.warn("BLOCK*"
         + " internalReleaseLease: All existing blocks are COMPLETE,"
         + " lease removed, file closed.");
@@ -3302,7 +3333,8 @@ public class FSNamesystem implements Nam
       // Close file if committed blocks are minimally replicated
       if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
-        finalizeINodeFileUnderConstruction(src, pendingFile);
+        finalizeINodeFileUnderConstruction(src, pendingFile,
+            iip.getLatestSnapshot());
         NameNode.stateChangeLog.warn("BLOCK*"
           + " internalReleaseLease: Committed blocks are minimally replicated,"
           + " lease removed, file closed.");
@@ -3372,7 +3404,7 @@ public class FSNamesystem implements Nam
     if (diff > 0) {
       try {
         String path = leaseManager.findPath(fileINode);
-        dir.updateSpaceConsumed(path, 0, -diff * fileINode.getBlockReplication());
+        dir.updateSpaceConsumed(path, 0, -diff*fileINode.getFileReplication());
       } catch (IOException e) {
         LOG.warn("Unexpected exception while updating disk space.", e);
       }
@@ -3380,15 +3412,18 @@ public class FSNamesystem implements Nam
   }
 
   private void finalizeINodeFileUnderConstruction(String src, 
-      INodeFileUnderConstruction pendingFile) 
+      INodeFileUnderConstruction pendingFile, Snapshot latestSnapshot) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
+    
+    pendingFile = pendingFile.recordModification(latestSnapshot,
+        dir.getINodeMap());
 
     // The file is no longer pending.
     // Create permanent INode, update blocks
-    INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile);
+    final INodeFile newFile = pendingFile.toINodeFile(now());
+    dir.replaceINodeFile(src, pendingFile, newFile);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
@@ -3426,7 +3461,7 @@ public class FSNamesystem implements Nam
       if (storedBlock == null) {
         throw new IOException("Block (=" + lastblock + ") not found");
       }
-      INodeFile iFile = (INodeFile) storedBlock.getBlockCollection();
+      INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
       if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
         throw new IOException("Unexpected block (=" + lastblock
                               + ") since the file (=" + iFile.getLocalName()
@@ -3481,7 +3516,8 @@ public class FSNamesystem implements Nam
         commitOrCompleteLastBlock(pendingFile, storedBlock);
 
         //remove lease, close file
-        finalizeINodeFileUnderConstruction(src, pendingFile);
+        finalizeINodeFileUnderConstruction(src, pendingFile,
+            Snapshot.findLatestSnapshot(pendingFile, null));
       } else {
         // If this commit does not want to close the file, persist blocks
         dir.persistBlocks(src, pendingFile);
@@ -3824,6 +3860,16 @@ public class FSNamesystem implements Nam
   public int getTotalLoad() {
     return datanodeStatistics.getXceiverCount();
   }
+  
+  @Metric({ "SnapshottableDirectories", "Number of snapshottable directories" })
+  public int getNumSnapshottableDirs() {
+    return this.snapshotManager.getNumSnapshottableDirs();
+  }
+
+  @Metric({ "Snapshots", "The number of snapshots" })
+  public int getNumSnapshots() {
+    return this.snapshotManager.getNumSnapshots();
+  }
 
   int getNumberOfDatanodes(DatanodeReportType type) {
     readLock();
@@ -4966,7 +5012,7 @@ public class FSNamesystem implements Nam
     }
     
     // check file inode
-    INodeFile file = (INodeFile) storedBlock.getBlockCollection();
+    final INodeFile file = ((INode)storedBlock.getBlockCollection()).asFile();
     if (file==null || !file.isUnderConstruction()) {
       throw new IOException("The file " + storedBlock + 
           " belonged to does not exist or it is not under construction.");
@@ -5246,7 +5292,7 @@ public class FSNamesystem implements Nam
 
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
-        INode inode = (INodeFile) blockManager.getBlockCollection(blk);
+        final INode inode = (INode)blockManager.getBlockCollection(blk);
         skip++;
         if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
           String src = FSDirectory.getFullPathName(inode);
@@ -5422,7 +5468,7 @@ public class FSNamesystem implements Nam
   /**
    * @param in load the state of secret manager from input stream
    */
-  void loadSecretManagerState(DataInputStream in) throws IOException {
+  void loadSecretManagerState(DataInput in) throws IOException {
     dtSecretManager.loadSecretManagerState(in);
   }
 
@@ -5779,6 +5825,272 @@ public class FSNamesystem implements Nam
     return this.blockManager.getDatanodeManager()
         .shouldAvoidStaleDataNodesForWrite();
   }
+  
+  public SnapshotManager getSnapshotManager() {
+    return snapshotManager;
+  }
+  
+  /** Allow snapshot on a directroy. */
+  void allowSnapshot(String path) throws SafeModeException, IOException {
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot allow snapshot for " + path,
+            safeMode);
+      }
+      checkSuperuserPrivilege();
+
+      dir.writeLock();
+      try {
+        snapshotManager.setSnapshottable(path, true);
+      } finally {
+        dir.writeUnlock();
+      }
+      getEditLog().logAllowSnapshot(path);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "allowSnapshot", path, null, null);
+    }
+  }
+  
+  /** Disallow snapshot on a directory. */
+  void disallowSnapshot(String path) throws SafeModeException, IOException {
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot disallow snapshot for " + path,
+            safeMode);
+      }
+      checkSuperuserPrivilege();
+
+      dir.writeLock();
+      try {
+        snapshotManager.resetSnapshottable(path);
+      } finally {
+        dir.writeUnlock();
+      }
+      getEditLog().logDisallowSnapshot(path);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "disallowSnapshot", path, null, null);
+    }
+  }
+  
+  /**
+   * Create a snapshot
+   * @param snapshotRoot The directory path where the snapshot is taken
+   * @param snapshotName The name of the snapshot
+   */
+  String createSnapshot(String snapshotRoot, String snapshotName)
+      throws SafeModeException, IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    writeLock();
+    final String snapshotPath;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot create snapshot for "
+            + snapshotRoot, safeMode);
+      }
+      if (isPermissionEnabled) {
+        checkOwner(pc, snapshotRoot);
+      }
+
+      if (snapshotName == null || snapshotName.isEmpty()) {
+        snapshotName = Snapshot.generateDefaultSnapshotName();
+      }
+      dir.verifySnapshotName(snapshotName, snapshotRoot);
+      dir.writeLock();
+      try {
+        snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
+      } finally {
+        dir.writeUnlock();
+      }
+      getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "createSnapshot", snapshotRoot, snapshotPath, null);
+    }
+    return snapshotPath;
+  }
+  
+  /**
+   * Rename a snapshot
+   * @param path The directory path where the snapshot was taken
+   * @param snapshotOldName Old snapshot name
+   * @param snapshotNewName New snapshot name
+   * @throws SafeModeException
+   * @throws IOException 
+   */
+  void renameSnapshot(String path, String snapshotOldName,
+      String snapshotNewName) throws SafeModeException, IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot rename snapshot for " + path,
+            safeMode);
+      }
+      if (isPermissionEnabled) {
+        checkOwner(pc, path);
+      }
+      dir.verifySnapshotName(snapshotNewName, path);
+      
+      snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
+      getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
+      String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
+      logAuditEvent(true, "renameSnapshot", oldSnapshotRoot, newSnapshotRoot, null);
+    }
+  }
+  
+  /**
+   * Get the list of snapshottable directories that are owned 
+   * by the current user. Return all the snapshottable directories if the 
+   * current user is a super user.
+   * @return The list of all the current snapshottable directories
+   * @throws IOException
+   */
+  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
+      throws IOException {
+    SnapshottableDirectoryStatus[] status = null;
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      FSPermissionChecker checker = new FSPermissionChecker(
+          fsOwner.getShortUserName(), supergroup);
+      final String user = checker.isSuperUser()? null : checker.getUser();
+      status = snapshotManager.getSnapshottableDirListing(user);
+    } finally {
+      readUnlock();
+    }
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "listSnapshottableDirectory", null, null, null);
+    }
+    return status;
+  }
+  
+  /**
+   * Get the difference between two snapshots (or between a snapshot and the
+   * current status) of a snapshottable directory.
+   * 
+   * @param path The full path of the snapshottable directory.
+   * @param fromSnapshot Name of the snapshot to calculate the diff from. Null
+   *          or empty string indicates the current tree.
+   * @param toSnapshot Name of the snapshot to calculated the diff to. Null or
+   *          empty string indicates the current tree.
+   * @return A report about the difference between {@code fromSnapshot} and 
+   *         {@code toSnapshot}. Modified/deleted/created/renamed files and 
+   *         directories belonging to the snapshottable directories are listed 
+   *         and labeled as M/-/+/R respectively. 
+   * @throws IOException
+   */
+  SnapshotDiffReport getSnapshotDiffReport(String path,
+      String fromSnapshot, String toSnapshot) throws IOException {
+    SnapshotDiffInfo diffs = null;
+    final FSPermissionChecker pc = getPermissionChecker();
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      if (isPermissionEnabled) {
+        checkSubtreeReadPermission(pc, path, fromSnapshot);
+        checkSubtreeReadPermission(pc, path, toSnapshot);
+      }
+      diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
+    } finally {
+      readUnlock();
+    }
+    
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "computeSnapshotDiff", null, null, null);
+    }
+    return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
+        path, fromSnapshot, toSnapshot,
+        Collections.<DiffReportEntry> emptyList());
+  }
+  
+  private void checkSubtreeReadPermission(final FSPermissionChecker pc,
+      final String snapshottablePath, final String snapshot)
+          throws AccessControlException, UnresolvedLinkException {
+    final String fromPath = snapshot == null?
+        snapshottablePath: Snapshot.getSnapshotPath(snapshottablePath, snapshot);
+    checkPermission(pc, fromPath, false, null, null, FsAction.READ, FsAction.READ);
+  }
+  
+  /**
+   * Delete a snapshot of a snapshottable directory
+   * @param snapshotRoot The snapshottable directory
+   * @param snapshotName The name of the to-be-deleted snapshot
+   * @throws SafeModeException
+   * @throws IOException
+   */
+  void deleteSnapshot(String snapshotRoot, String snapshotName)
+      throws SafeModeException, IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot delete snapshot for " + snapshotRoot, safeMode);
+      }
+      checkOwner(pc, snapshotRoot);
+
+      BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+      List<INode> removedINodes = new ArrayList<INode>();
+      dir.writeLock();
+      try {
+        snapshotManager.deleteSnapshot(snapshotRoot, snapshotName,
+            collectedBlocks, removedINodes);
+        dir.removeFromInodeMap(removedINodes);
+      } finally {
+        dir.writeUnlock();
+      }
+      removedINodes.clear();
+      this.removeBlocks(collectedBlocks);
+      collectedBlocks.clear();
+      getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+    
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
+      logAuditEvent(true, "deleteSnapshot", rootPath, null, null);
+    }
+  }
+
+  /**
+   * Remove a list of INodeDirectorySnapshottable from the SnapshotManager
+   * @param toRemove the list of INodeDirectorySnapshottable to be removed
+   */
+  void removeSnapshottableDirs(List<INodeDirectorySnapshottable> toRemove) {
+    if (snapshotManager != null) {
+      snapshotManager.removeSnapshottable(toRemove);
+    }
+  }
 
   /**
    * Default AuditLogger implementation; used when no access logger is

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Thu May  9 23:55:49 2013
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -41,6 +42,15 @@ import org.apache.hadoop.security.UserGr
  */
 class FSPermissionChecker {
   static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
+
+  /** @return a string for throwing {@link AccessControlException} */
+  private static String toAccessControlString(INode inode) {
+    return "\"" + inode.getFullPathName() + "\":"
+          + inode.getUserName() + ":" + inode.getGroupName()
+          + ":" + (inode.isDirectory()? "d": "-") + inode.getFsPermission();
+  }
+
+
   private final UserGroupInformation ugi;
   private final String user;  
   /** A set with group namess. Not synchronized since it is unmodifiable */
@@ -132,110 +142,114 @@ class FSPermissionChecker {
     }
     // check if (parentAccess != null) && file exists, then check sb
       // Resolve symlinks, the check is performed on the link target.
-      final INode[] inodes = root.getExistingPathINodes(path, true).getINodes();
+      final INodesInPath inodesInPath = root.getINodesInPath(path, true); 
+      final Snapshot snapshot = inodesInPath.getPathSnapshot();
+      final INode[] inodes = inodesInPath.getINodes();
       int ancestorIndex = inodes.length - 2;
       for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
           ancestorIndex--);
-      checkTraverse(inodes, ancestorIndex);
+      checkTraverse(inodes, ancestorIndex, snapshot);
 
+      final INode last = inodes[inodes.length - 1];
       if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
-          && inodes.length > 1 && inodes[inodes.length - 1] != null) {
-        checkStickyBit(inodes[inodes.length - 2], inodes[inodes.length - 1]);
+          && inodes.length > 1 && last != null) {
+        checkStickyBit(inodes[inodes.length - 2], last, snapshot);
       }
       if (ancestorAccess != null && inodes.length > 1) {
-        check(inodes, ancestorIndex, ancestorAccess);
+        check(inodes, ancestorIndex, snapshot, ancestorAccess);
       }
       if (parentAccess != null && inodes.length > 1) {
-        check(inodes, inodes.length - 2, parentAccess);
+        check(inodes, inodes.length - 2, snapshot, parentAccess);
       }
       if (access != null) {
-        check(inodes[inodes.length - 1], access);
+        check(last, snapshot, access);
       }
       if (subAccess != null) {
-        checkSubAccess(inodes[inodes.length - 1], subAccess);
+        checkSubAccess(last, snapshot, subAccess);
       }
       if (doCheckOwner) {
-        checkOwner(inodes[inodes.length - 1]);
+        checkOwner(last, snapshot);
       }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkOwner(INode inode) throws AccessControlException {
-    if (inode != null && user.equals(inode.getUserName())) {
+  private void checkOwner(INode inode, Snapshot snapshot
+      ) throws AccessControlException {
+    if (inode != null && user.equals(inode.getUserName(snapshot))) {
       return;
     }
     throw new AccessControlException("Permission denied");
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkTraverse(INode[] inodes, int last
+  private void checkTraverse(INode[] inodes, int last, Snapshot snapshot
       ) throws AccessControlException {
     for(int j = 0; j <= last; j++) {
-      check(inodes[j], FsAction.EXECUTE);
+      check(inodes[j], snapshot, FsAction.EXECUTE);
     }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkSubAccess(INode inode, FsAction access
+  private void checkSubAccess(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null || !inode.isDirectory()) {
       return;
     }
 
     Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
-    for(directories.push((INodeDirectory)inode); !directories.isEmpty(); ) {
+    for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
       INodeDirectory d = directories.pop();
-      check(d, access);
+      check(d, snapshot, access);
 
-      for(INode child : d.getChildrenList()) {
+      for(INode child : d.getChildrenList(snapshot)) {
         if (child.isDirectory()) {
-          directories.push((INodeDirectory)child);
+          directories.push(child.asDirectory());
         }
       }
     }
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode[] inodes, int i, FsAction access
+  private void check(INode[] inodes, int i, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
-    check(i >= 0? inodes[i]: null, access);
+    check(i >= 0? inodes[i]: null, snapshot, access);
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void check(INode inode, FsAction access
+  private void check(INode inode, Snapshot snapshot, FsAction access
       ) throws AccessControlException {
     if (inode == null) {
       return;
     }
-    FsPermission mode = inode.getFsPermission();
+    FsPermission mode = inode.getFsPermission(snapshot);
 
-    if (user.equals(inode.getUserName())) { //user class
+    if (user.equals(inode.getUserName(snapshot))) { //user class
       if (mode.getUserAction().implies(access)) { return; }
     }
-    else if (groups.contains(inode.getGroupName())) { //group class
+    else if (groups.contains(inode.getGroupName(snapshot))) { //group class
       if (mode.getGroupAction().implies(access)) { return; }
     }
     else { //other class
       if (mode.getOtherAction().implies(access)) { return; }
     }
     throw new AccessControlException("Permission denied: user=" + user
-        + ", access=" + access + ", inode=" + inode);
+        + ", access=" + access + ", inode=" + toAccessControlString(inode));
   }
 
   /** Guarded by {@link FSNamesystem#readLock()} */
-  private void checkStickyBit(INode parent, INode inode)
-      throws AccessControlException {
-    if(!parent.getFsPermission().getStickyBit()) {
+  private void checkStickyBit(INode parent, INode inode, Snapshot snapshot
+      ) throws AccessControlException {
+    if(!parent.getFsPermission(snapshot).getStickyBit()) {
       return;
     }
 
     // If this user is the directory owner, return
-    if(parent.getUserName().equals(user)) {
+    if(parent.getUserName(snapshot).equals(user)) {
       return;
     }
 
     // if this user is the file owner, return
-    if(inode.getUserName().equals(user)) {
+    if(inode.getUserName(snapshot).equals(user)) {
       return;
     }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu May  9 23:55:49 2013
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
@@ -30,12 +32,17 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.Diff;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.SignedBytes;
+import com.google.common.base.Preconditions;
 
 /**
  * We keep an in-memory representation of the file/block hierarchy.
@@ -43,173 +50,202 @@ import com.google.common.primitives.Sign
  * directory inodes.
  */
 @InterfaceAudience.Private
-abstract class INode implements Comparable<byte[]>, LinkedElement {
-  static final List<INode> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<INode>());
+public abstract class INode implements Diff.Element<byte[]> {
+  public static final Log LOG = LogFactory.getLog(INode.class);
 
-  /** Wrapper of two counters for namespace consumed and diskspace consumed. */
-  static class DirCounts {
-    /** namespace count */
-    long nsCount = 0;
-    /** diskspace count */
-    long dsCount = 0;
-    
-    /** returns namespace count */
-    long getNsCount() {
-      return nsCount;
-    }
-    /** returns diskspace count */
-    long getDsCount() {
-      return dsCount;
-    }
-  }
-  
-  private static enum PermissionStatusFormat {
-    MODE(0, 16),
-    GROUP(MODE.OFFSET + MODE.LENGTH, 25),
-    USER(GROUP.OFFSET + GROUP.LENGTH, 23);
+  /** parent is either an {@link INodeDirectory} or an {@link INodeReference}.*/
+  private INode parent = null;
 
-    final int OFFSET;
-    final int LENGTH; //bit length
-    final long MASK;
+  INode(INode parent) {
+    this.parent = parent;
+  }
 
-    PermissionStatusFormat(int offset, int length) {
-      OFFSET = offset;
-      LENGTH = length;
-      MASK = ((-1L) >>> (64 - LENGTH)) << OFFSET;
-    }
+  /** Get inode id */
+  public abstract long getId();
 
-    long retrieve(long record) {
-      return (record & MASK) >>> OFFSET;
-    }
+  /**
+   * Check whether this is the root inode.
+   */
+  final boolean isRoot() {
+    return getLocalNameBytes().length == 0;
+  }
 
-    long combine(long bits, long record) {
-      return (record & ~MASK) | (bits << OFFSET);
-    }
+  /** Get the {@link PermissionStatus} */
+  abstract PermissionStatus getPermissionStatus(Snapshot snapshot);
 
-    /** Encode the {@link PermissionStatus} to a long. */
-    static long toLong(PermissionStatus ps) {
-      long permission = 0L;
-      final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
-          ps.getUserName());
-      permission = USER.combine(user, permission);
-      final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
-          ps.getGroupName());
-      permission = GROUP.combine(group, permission);
-      final int mode = ps.getPermission().toShort();
-      permission = MODE.combine(mode, permission);
-      return permission;
-    }
+  /** The same as getPermissionStatus(null). */
+  final PermissionStatus getPermissionStatus() {
+    return getPermissionStatus(null);
   }
 
   /**
-   * The inode id
-   */
-  final private long id;
-
-  /**
-   *  The inode name is in java UTF8 encoding; 
-   *  The name in HdfsFileStatus should keep the same encoding as this.
-   *  if this encoding is changed, implicitly getFileInfo and listStatus in
-   *  clientProtocol are changed; The decoding at the client
-   *  side should change accordingly.
-   */
-  private byte[] name = null;
-  /** 
-   * Permission encoded using {@link PermissionStatusFormat}.
-   * Codes other than {@link #clonePermissionStatus(INode)}
-   * and {@link #updatePermissionStatus(PermissionStatusFormat, long)}
-   * should not modify it.
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return user name
    */
-  private long permission = 0L;
-  protected INodeDirectory parent = null;
-  protected long modificationTime = 0L;
-  protected long accessTime = 0L;
-  protected LinkedElement next = null;
+  abstract String getUserName(Snapshot snapshot);
 
-  private INode(long id, byte[] name, long permission, INodeDirectory parent,
-      long modificationTime, long accessTime) {
-    this.id = id;
-    this.name = name;
-    this.permission = permission;
-    this.parent = parent;
-    this.modificationTime = modificationTime;
-    this.accessTime = accessTime;
+  /** The same as getUserName(null). */
+  public final String getUserName() {
+    return getUserName(null);
   }
 
-  INode(long id, byte[] name, PermissionStatus permissions,
-      INodeDirectory parent, long modificationTime, long accessTime) {
-    this(id, name, PermissionStatusFormat.toLong(permissions), parent,
-        modificationTime, accessTime);
-  }
-  
-  INode(long id, PermissionStatus permissions, long mtime, long atime) {
-    this(id, null, PermissionStatusFormat.toLong(permissions), null, mtime, atime);
+  /** Set user */
+  abstract void setUser(String user);
+
+  /** Set user */
+  final INode setUser(String user, Snapshot latest, INodeMap inodeMap)
+      throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest, inodeMap);
+    nodeToUpdate.setUser(user);
+    return nodeToUpdate;
+  }
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return group name
+   */
+  abstract String getGroupName(Snapshot snapshot);
+
+  /** The same as getGroupName(null). */
+  public final String getGroupName() {
+    return getGroupName(null);
   }
-  
-  protected INode(long id, String name, PermissionStatus permissions) {
-    this(id, DFSUtil.string2Bytes(name), permissions, null, 0L, 0L);
+
+  /** Set group */
+  abstract void setGroup(String group);
+
+  /** Set group */
+  final INode setGroup(String group, Snapshot latest, INodeMap inodeMap)
+      throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest, inodeMap);
+    nodeToUpdate.setGroup(group);
+    return nodeToUpdate;
   }
+
+  /**
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return permission.
+   */
+  abstract FsPermission getFsPermission(Snapshot snapshot);
   
-  /** @param other Other node to be copied */
-  INode(INode other) {
-    this(other.getId(), other.getLocalNameBytes(), other.permission, other
-        .getParent(), other.getModificationTime(), other.getAccessTime());
+  /** The same as getFsPermission(null). */
+  public final FsPermission getFsPermission() {
+    return getFsPermission(null);
   }
 
-  /** Get inode id */
-  public long getId() {
-    return this.id;
+  /** Set the {@link FsPermission} of this {@link INode} */
+  abstract void setPermission(FsPermission permission);
+
+  /** Set the {@link FsPermission} of this {@link INode} */
+  INode setPermission(FsPermission permission, Snapshot latest,
+      INodeMap inodeMap) throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest, inodeMap);
+    nodeToUpdate.setPermission(permission);
+    return nodeToUpdate;
   }
-  
+
   /**
-   * Check whether this is the root inode.
+   * @return if the given snapshot is null, return this;
+   *     otherwise return the corresponding snapshot inode.
    */
-  boolean isRoot() {
-    return name.length == 0;
+  public INode getSnapshotINode(final Snapshot snapshot) {
+    return this;
   }
 
-  /** Clone the {@link PermissionStatus}. */
-  void clonePermissionStatus(INode that) {
-    this.permission = that.permission;
-  }
-  /** Get the {@link PermissionStatus} */
-  protected PermissionStatus getPermissionStatus() {
-    return new PermissionStatus(getUserName(),getGroupName(),getFsPermission());
-  }
-  private void updatePermissionStatus(PermissionStatusFormat f, long n) {
-    permission = f.combine(n, permission);
-  }
-  /** Get user name */
-  public String getUserName() {
-    int n = (int)PermissionStatusFormat.USER.retrieve(permission);
-    return SerialNumberManager.INSTANCE.getUser(n);
+  /** Is this inode in the latest snapshot? */
+  public final boolean isInLatestSnapshot(final Snapshot latest) {
+    if (latest == null) {
+      return false;
+    }
+    // if parent is a reference node, parent must be a renamed node. We can 
+    // stop the check at the reference node.
+    if (parent != null && parent.isReference()) {
+      return true;
+    }
+    final INodeDirectory parentDir = getParent();
+    if (parentDir == null) { // root
+      return true;
+    }
+    if (!parentDir.isInLatestSnapshot(latest)) {
+      return false;
+    }
+    final INode child = parentDir.getChild(getLocalNameBytes(), latest);
+    if (this == child) {
+      return true;
+    }
+    if (child == null || !(child.isReference())) {
+      return false;
+    }
+    return this == child.asReference().getReferredINode();
   }
-  /** Set user */
-  protected void setUser(String user) {
-    int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
-    updatePermissionStatus(PermissionStatusFormat.USER, n);
-  }
-  /** Get group name */
-  public String getGroupName() {
-    int n = (int)PermissionStatusFormat.GROUP.retrieve(permission);
-    return SerialNumberManager.INSTANCE.getGroup(n);
+  
+  /** @return true if the given inode is an ancestor directory of this inode. */
+  public final boolean isAncestorDirectory(final INodeDirectory dir) {
+    for(INodeDirectory p = getParent(); p != null; p = p.getParent()) {
+      if (p == dir) {
+        return true;
+      }
+    }
+    return false;
   }
-  /** Set group */
-  protected void setGroup(String group) {
-    int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
-    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
-  }
-  /** Get the {@link FsPermission} */
-  public FsPermission getFsPermission() {
-    return new FsPermission(
-        (short)PermissionStatusFormat.MODE.retrieve(permission));
+
+  /**
+   * When {@link #recordModification} is called on a referred node,
+   * this method tells which snapshot the modification should be
+   * associated with: the snapshot that belongs to the SRC tree of the rename
+   * operation, or the snapshot belonging to the DST tree.
+   * 
+   * @param latestInDst
+   *          the latest snapshot in the DST tree above the reference node
+   * @return True: the modification should be recorded in the snapshot that
+   *         belongs to the SRC tree. False: the modification should be
+   *         recorded in the snapshot that belongs to the DST tree.
+   */
+  public final boolean shouldRecordInSrcSnapshot(final Snapshot latestInDst) {
+    Preconditions.checkState(!isReference());
+
+    if (latestInDst == null) {
+      return true;
+    }
+    INodeReference withCount = getParentReference();
+    if (withCount != null) {
+      int dstSnapshotId = withCount.getParentReference().getDstSnapshotId();
+      if (dstSnapshotId >= latestInDst.getId()) {
+        return true;
+      }
+    }
+    return false;
   }
-  protected short getFsPermissionShort() {
-    return (short)PermissionStatusFormat.MODE.retrieve(permission);
+
+  /**
+   * This inode is being modified.  The previous version of the inode needs to
+   * be recorded in the latest snapshot.
+   *
+   * @param latest the latest snapshot that has been taken.
+   *        Note that it is null if no snapshots have been taken.
+   * @param inodeMap while recording modification, the inode or its parent may 
+   *                 get replaced, and the inodeMap needs to be updated.
+   * @return The current inode, which usually is the same object of this inode.
+   *         However, in some cases, this inode may be replaced with a new inode
+   *         for maintaining snapshots. The current inode is then the new inode.
+   */
+  abstract INode recordModification(final Snapshot latest,
+      final INodeMap inodeMap) throws QuotaExceededException;
+
+  /** Check whether it's a reference. */
+  public boolean isReference() {
+    return false;
   }
-  /** Set the {@link FsPermission} of this {@link INode} */
-  void setPermission(FsPermission permission) {
-    updatePermissionStatus(PermissionStatusFormat.MODE, permission.toShort());
+
+  /** Cast this inode to an {@link INodeReference}.  */
+  public INodeReference asReference() {
+    throw new IllegalStateException("Current inode is not a reference: "
+        + this.toDetailString());
   }
 
   /**
@@ -219,6 +255,12 @@ abstract class INode implements Comparab
     return false;
   }
 
+  /** Cast this inode to an {@link INodeFile}.  */
+  public INodeFile asFile() {
+    throw new IllegalStateException("Current inode is not a file: "
+        + this.toDetailString());
+  }
+
   /**
    * Check whether it's a directory
    */
@@ -226,164 +268,346 @@ abstract class INode implements Comparab
     return false;
   }
 
+  /** Cast this inode to an {@link INodeDirectory}.  */
+  public INodeDirectory asDirectory() {
+    throw new IllegalStateException("Current inode is not a directory: "
+        + this.toDetailString());
+  }
+
+  /**
+   * Check whether it's a symlink
+   */
+  public boolean isSymlink() {
+    return false;
+  }
+
+  /** Cast this inode to an {@link INodeSymlink}.  */
+  public INodeSymlink asSymlink() {
+    throw new IllegalStateException("Current inode is not a symlink: "
+        + this.toDetailString());
+  }
+
   /**
-   * Collect all the blocks in all children of this INode. Count and return the
-   * number of files in the sub tree. Also clears references since this INode is
-   * deleted.
-   * 
-   * @param info
-   *          Containing all the blocks collected from the children of this
-   *          INode. These blocks later should be removed from the blocksMap.
+   * Clean the subtree under this inode and collect the blocks from the descents
+   * for further block deletion/update. The current inode can either resides in
+   * the current tree or be stored as a snapshot copy.
+   * 
+   * <pre>
+   * In general, we have the following rules. 
+   * 1. When deleting a file/directory in the current tree, we have different 
+   * actions according to the type of the node to delete. 
+   * 
+   * 1.1 The current inode (this) is an {@link INodeFile}. 
+   * 1.1.1 If {@code prior} is null, there is no snapshot taken on ancestors 
+   * before. Thus we simply destroy (i.e., to delete completely, no need to save 
+   * snapshot copy) the current INode and collect its blocks for further 
+   * cleansing.
+   * 1.1.2 Else do nothing since the current INode will be stored as a snapshot
+   * copy.
+   * 
+   * 1.2 The current inode is an {@link INodeDirectory}.
+   * 1.2.1 If {@code prior} is null, there is no snapshot taken on ancestors 
+   * before. Similarly, we destroy the whole subtree and collect blocks.
+   * 1.2.2 Else do nothing with the current INode. Recursively clean its 
+   * children.
+   * 
+   * 1.3 The current inode is a {@link FileWithSnapshot}.
+   * Call recordModification(..) to capture the current states.
+   * Mark the INode as deleted.
+   * 
+   * 1.4 The current inode is a {@link INodeDirectoryWithSnapshot}.
+   * Call recordModification(..) to capture the current states. 
+   * Destroy files/directories created after the latest snapshot 
+   * (i.e., the inodes stored in the created list of the latest snapshot).
+   * Recursively clean remaining children. 
+   *
+   * 2. When deleting a snapshot.
+   * 2.1 To clean {@link INodeFile}: do nothing.
+   * 2.2 To clean {@link INodeDirectory}: recursively clean its children.
+   * 2.3 To clean {@link FileWithSnapshot}: delete the corresponding snapshot in
+   * its diff list.
+   * 2.4 To clean {@link INodeDirectoryWithSnapshot}: delete the corresponding 
+   * snapshot in its diff list. Recursively clean its children.
+   * </pre>
+   * 
+   * @param snapshot
+   *          The snapshot to delete. Null means to delete the current
+   *          file/directory.
+   * @param prior
+   *          The latest snapshot before the to-be-deleted snapshot. When
+   *          deleting a current inode, this parameter captures the latest
+   *          snapshot.
+   * @param collectedBlocks
+   *          blocks collected from the descents for further block
+   *          deletion/update will be added to the given map.
+   * @param removedINodes
+   *          INodes collected from the descents for further cleaning up of 
+   *          inodeMap         
+   * @return quota usage delta when deleting a snapshot
+   */
+  public abstract Quota.Counts cleanSubtree(final Snapshot snapshot,
+      Snapshot prior, BlocksMapUpdateInfo collectedBlocks,
+      List<INode> removedINodes) throws QuotaExceededException;
+  
+  /**
+   * Destroy self and clear everything! If the INode is a file, this method
+   * collects its blocks for further block deletion. If the INode is a
+   * directory, the method goes down the subtree and collects blocks from the
+   * descents, and clears its parent/children references as well. The method
+   * also clears the diff list if the INode contains snapshot diff list.
+   * 
+   * @param collectedBlocks
+   *          blocks collected from the descents for further block
+   *          deletion/update will be added to this map.
+   * @param removedINodes
+   *          INodes collected from the descents for further cleaning up of
+   *          inodeMap
    */
-  abstract int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info);
+  public abstract void destroyAndCollectBlocks(
+      BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes);
 
   /** Compute {@link ContentSummary}. */
   public final ContentSummary computeContentSummary() {
-    long[] a = computeContentSummary(new long[]{0,0,0,0});
-    return new ContentSummary(a[0], a[1], a[2], getNsQuota(), 
-                              a[3], getDsQuota());
+    final Content.Counts counts = computeContentSummary(
+        Content.Counts.newInstance());
+    return new ContentSummary(counts.get(Content.LENGTH),
+        counts.get(Content.FILE) + counts.get(Content.SYMLINK),
+        counts.get(Content.DIRECTORY), getNsQuota(),
+        counts.get(Content.DISKSPACE), getDsQuota());
   }
+
   /**
-   * @return an array of three longs. 
-   * 0: length, 1: file count, 2: directory count 3: disk space
+   * Count subtree content summary with a {@link Content.Counts}.
+   *
+   * @param counts The subtree counts for returning.
+   * @return The same objects as the counts parameter.
    */
-  abstract long[] computeContentSummary(long[] summary);
+  public abstract Content.Counts computeContentSummary(Content.Counts counts);
   
   /**
+   * Check and add namespace/diskspace consumed to itself and the ancestors.
+   * @throws QuotaExceededException if quote is violated.
+   */
+  public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify,
+      int snapshotId) throws QuotaExceededException {
+    if (parent != null) {
+      parent.addSpaceConsumed(nsDelta, dsDelta, verify, snapshotId);
+    }
+  }
+
+  public void addSpaceConsumedToRenameSrc(long nsDelta, long dsDelta,
+      boolean verify, int snapshotId) throws QuotaExceededException {
+    if (parent != null) {
+      parent.addSpaceConsumedToRenameSrc(nsDelta, dsDelta, verify, snapshotId);
+    }
+  }
+
+  /**
    * Get the quota set for this inode
    * @return the quota if it is set; -1 otherwise
    */
-  long getNsQuota() {
+  public long getNsQuota() {
     return -1;
   }
 
-  long getDsQuota() {
+  public long getDsQuota() {
     return -1;
   }
   
-  boolean isQuotaSet() {
+  public final boolean isQuotaSet() {
     return getNsQuota() >= 0 || getDsQuota() >= 0;
   }
   
   /**
-   * Adds total number of names and total disk space taken under 
-   * this tree to counts.
-   * Returns updated counts object.
+   * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
    */
-  abstract DirCounts spaceConsumedInTree(DirCounts counts);
+  public final Quota.Counts computeQuotaUsage() {
+    return computeQuotaUsage(new Quota.Counts(), true);
+  }
+
+  /**
+   * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
+   * 
+   * With the existence of {@link INodeReference}, the same inode and its
+   * subtree may be referred by multiple {@link WithName} nodes and a
+   * {@link DstReference} node. To avoid circles while quota usage computation,
+   * we have the following rules:
+   * 
+   * <pre>
+   * 1. For a {@link DstReference} node, since the node must be in the current
+   * tree (or has been deleted as the end point of a series of rename 
+   * operations), we compute the quota usage of the referred node (and its 
+   * subtree) in the regular manner, i.e., including every inode in the current
+   * tree and in snapshot copies, as well as the size of diff list.
+   * 
+   * 2. For a {@link WithName} node, since the node must be in a snapshot, we 
+   * only count the quota usage for those nodes that still existed at the 
+   * creation time of the snapshot associated with the {@link WithName} node.
+   * We do not count in the size of the diff list.  
+   * <pre>
+   * 
+   * @param counts The subtree counts for returning.
+   * @param useCache Whether to use cached quota usage. Note that 
+   *                 {@link WithName} node never uses cache for its subtree.
+   * @param lastSnapshotId {@link Snapshot#INVALID_ID} indicates the computation
+   *                       is in the current tree. Otherwise the id indicates
+   *                       the computation range for a {@link WithName} node.
+   * @return The same objects as the counts parameter.
+   */
+  public abstract Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache, int lastSnapshotId);
+
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
+      boolean useCache) {
+    return computeQuotaUsage(counts, useCache, Snapshot.INVALID_ID);
+  }
   
   /**
    * @return null if the local name is null; otherwise, return the local name.
    */
-  String getLocalName() {
+  public final String getLocalName() {
+    final byte[] name = getLocalNameBytes();
     return name == null? null: DFSUtil.bytes2String(name);
   }
 
-
-  String getLocalParentDir() {
-    INode inode = isRoot() ? this : getParent();
-    String parentDir = "";
-    if (inode != null) {
-      parentDir = inode.getFullPathName();
-    }
-    return (parentDir != null) ? parentDir : "";
-  }
-
   /**
    * @return null if the local name is null;
    *         otherwise, return the local name byte array.
    */
-  byte[] getLocalNameBytes() {
-    return name;
-  }
+  public abstract byte[] getLocalNameBytes();
 
-  /**
-   * Set local file name
-   */
-  void setLocalName(String name) {
-    this.name = DFSUtil.string2Bytes(name);
+  @Override
+  public final byte[] getKey() {
+    return getLocalNameBytes();
   }
 
   /**
    * Set local file name
    */
-  void setLocalName(byte[] name) {
-    this.name = name;
-  }
+  public abstract void setLocalName(byte[] name);
 
   public String getFullPathName() {
     // Get the full path name of this inode.
     return FSDirectory.getFullPathName(this);
   }
-
+  
   @Override
   public String toString() {
-    return "\"" + getFullPathName() + "\":"
-    + getUserName() + ":" + getGroupName() + ":"
-    + (isDirectory()? "d": "-") + getFsPermission();
+    return getLocalName();
   }
 
-  /**
-   * Get parent directory 
-   * @return parent INode
-   */
-  INodeDirectory getParent() {
-    return this.parent;
+  @VisibleForTesting
+  public final String getObjectString() {
+    return getClass().getSimpleName() + "@"
+        + Integer.toHexString(super.hashCode());
   }
 
-  /** 
-   * Get last modification time of inode.
-   * @return access time
-   */
-  public long getModificationTime() {
-    return this.modificationTime;
+  /** @return a string description of the parent. */
+  @VisibleForTesting
+  public final String getParentString() {
+    final INodeReference parentRef = getParentReference();
+    if (parentRef != null) {
+      return "parentRef=" + parentRef.getLocalName() + "->";
+    } else {
+      final INodeDirectory parentDir = getParent();
+      if (parentDir != null) {
+        return "parentDir=" + parentDir.getLocalName() + "/";
+      } else {
+        return "parent=null";
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public String toDetailString() {
+    return toString() + "(" + getObjectString() + "), " + getParentString();
+  }
+
+  /** @return the parent directory */
+  public final INodeDirectory getParent() {
+    return parent == null? null
+        : parent.isReference()? getParentReference().getParent(): parent.asDirectory();
   }
 
   /**
-   * Set last modification time of inode.
+   * @return the parent as a reference if this is a referred inode;
+   *         otherwise, return null.
    */
-  void setModificationTime(long modtime) {
-    assert isDirectory();
-    if (this.modificationTime <= modtime) {
-      this.modificationTime = modtime;
-    }
+  public INodeReference getParentReference() {
+    return parent == null || !parent.isReference()? null: (INodeReference)parent;
+  }
+
+  /** Set parent directory */
+  public final void setParent(INodeDirectory parent) {
+    this.parent = parent;
+  }
+
+  /** Set container. */
+  public final void setParentReference(INodeReference parent) {
+    this.parent = parent;
+  }
+
+  /** Clear references to other objects. */
+  public void clear() {
+    setParent(null);
   }
 
   /**
-   * Always set the last modification time of inode.
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
+   * @return modification time.
    */
-  void setModificationTimeForce(long modtime) {
-    this.modificationTime = modtime;
+  abstract long getModificationTime(Snapshot snapshot);
+
+  /** The same as getModificationTime(null). */
+  public final long getModificationTime() {
+    return getModificationTime(null);
+  }
+
+  /** Update modification time if it is larger than the current value. */
+  public abstract INode updateModificationTime(long mtime, Snapshot latest,
+      INodeMap inodeMap) throws QuotaExceededException;
+
+  /** Set the last modification time of inode. */
+  public abstract void setModificationTime(long modificationTime);
+
+  /** Set the last modification time of inode. */
+  public final INode setModificationTime(long modificationTime,
+      Snapshot latest, INodeMap inodeMap) throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest, inodeMap);
+    nodeToUpdate.setModificationTime(modificationTime);
+    return nodeToUpdate;
   }
 
   /**
-   * Get access time of inode.
+   * @param snapshot
+   *          if it is not null, get the result from the given snapshot;
+   *          otherwise, get the result from the current inode.
    * @return access time
    */
-  public long getAccessTime() {
-    return accessTime;
+  abstract long getAccessTime(Snapshot snapshot);
+
+  /** The same as getAccessTime(null). */
+  public final long getAccessTime() {
+    return getAccessTime(null);
   }
 
   /**
    * Set last access time of inode.
    */
-  void setAccessTime(long atime) {
-    accessTime = atime;
-  }
+  public abstract void setAccessTime(long accessTime);
 
   /**
-   * Is this inode being constructed?
+   * Set last access time of inode.
    */
-  public boolean isUnderConstruction() {
-    return false;
+  public final INode setAccessTime(long accessTime, Snapshot latest,
+      INodeMap inodeMap) throws QuotaExceededException {
+    final INode nodeToUpdate = recordModification(latest, inodeMap);
+    nodeToUpdate.setAccessTime(accessTime);
+    return nodeToUpdate;
   }
 
-  /**
-   * Check whether it's a symlink
-   */
-  public boolean isSymlink() {
-    return false;
-  }
 
   /**
    * Breaks file path into components.
@@ -419,41 +643,9 @@ abstract class INode implements Comparab
     return StringUtils.split(path, Path.SEPARATOR_CHAR);
   }
 
-  /**
-   * Given some components, create a path name.
-   * @param components The path components
-   * @param start index
-   * @param end index
-   * @return concatenated path
-   */
-  static String constructPath(byte[][] components, int start, int end) {
-    StringBuilder buf = new StringBuilder();
-    for (int i = start; i < end; i++) {
-      buf.append(DFSUtil.bytes2String(components[i]));
-      if (i < end - 1) {
-        buf.append(Path.SEPARATOR);
-      }
-    }
-    return buf.toString();
-  }
-
-  boolean removeNode() {
-    if (parent == null) {
-      return false;
-    } else {
-      parent.removeChild(this);
-      parent = null;
-      return true;
-    }
-  }
-
-  private static final byte[] EMPTY_BYTES = {};
-
   @Override
   public final int compareTo(byte[] bytes) {
-    final byte[] left = name == null? EMPTY_BYTES: name;
-    final byte[] right = bytes == null? EMPTY_BYTES: bytes;
-    return SignedBytes.lexicographicalComparator().compare(left, right);
+    return DFSUtil.compareBytes(getLocalNameBytes(), bytes);
   }
 
   @Override
@@ -464,78 +656,47 @@ abstract class INode implements Comparab
     if (that == null || !(that instanceof INode)) {
       return false;
     }
-    return id == ((INode) that).id;
+    return getId() == ((INode) that).getId();
   }
 
   @Override
   public final int hashCode() {
+    long id = getId();
     return (int)(id^(id>>>32));  
   }
   
   /**
-   * Create an INode; the inode's name is not set yet
-   * 
-   * @param id preassigned inode id
-   * @param permissions permissions
-   * @param blocks blocks if a file
-   * @param symlink symblic link if a symbolic link
-   * @param replication replication factor
-   * @param modificationTime modification time
-   * @param atime access time
-   * @param nsQuota namespace quota
-   * @param dsQuota disk quota
-   * @param preferredBlockSize block size
-   * @return an inode
-   */
-  static INode newINode(long id,
-                        PermissionStatus permissions,
-                        BlockInfo[] blocks,
-                        String symlink,
-                        short replication,
-                        long modificationTime,
-                        long atime,
-                        long nsQuota,
-                        long dsQuota,
-                        long preferredBlockSize) {
-    if (symlink.length() != 0) { // check if symbolic link
-      return new INodeSymlink(id, symlink, modificationTime, atime, permissions);
-    }  else if (blocks == null) { //not sym link and blocks null? directory!
-      if (nsQuota >= 0 || dsQuota >= 0) {
-        return new INodeDirectoryWithQuota(
-             id, permissions, modificationTime, nsQuota, dsQuota);
-      } 
-      // regular directory
-      return new INodeDirectory(id, permissions, modificationTime);
-    }
-    // file
-    return new INodeFile(id, permissions, blocks, replication,
-        modificationTime, atime, preferredBlockSize);
-  }
-
-  /**
    * Dump the subtree starting from this inode.
    * @return a text representation of the tree.
    */
   @VisibleForTesting
-  public StringBuffer dumpTreeRecursively() {
+  public final StringBuffer dumpTreeRecursively() {
     final StringWriter out = new StringWriter(); 
-    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder());
+    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(), null);
     return out.getBuffer();
   }
 
+  @VisibleForTesting
+  public final void dumpTreeRecursively(PrintStream out) {
+    dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(), null);
+  }
+
   /**
    * Dump tree recursively.
    * @param prefix The prefix string that each line should print.
    */
   @VisibleForTesting
-  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix) {
+  public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
+      Snapshot snapshot) {
     out.print(prefix);
     out.print(" ");
-    out.print(getLocalName());
+    final String name = getLocalName();
+    out.print(name.isEmpty()? "/": name);
     out.print("   (");
-    final String s = super.toString();
-    out.print(s.substring(s.lastIndexOf(getClass().getSimpleName())));
-    out.println(")");
+    out.print(getObjectString());
+    out.print("), ");
+    out.print(getParentString());
+    out.print(", " + getPermissionStatus(snapshot));
   }
   
   /**
@@ -581,14 +742,4 @@ abstract class INode implements Comparab
       toDeleteList.clear();
     }
   }
-  
-  @Override
-  public void setNext(LinkedElement next) {
-    this.next = next;
-  }
-  
-  @Override
-  public LinkedElement getNext() {
-    return next;
-  }
 }



Mime
View raw message