hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1480838 [4/7] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/...
Date Thu, 09 May 2013 23:55:52 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu May  9 23:55:49 2013
@@ -44,10 +44,14 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CreateSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteSnapshotOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
@@ -662,7 +667,7 @@ public class FSEditLog implements LogsPu
     AddOp op = AddOp.getInstance(cache.get())
       .setInodeId(newNode.getId())
       .setPath(path)
-      .setReplication(newNode.getBlockReplication())
+      .setReplication(newNode.getFileReplication())
       .setModificationTime(newNode.getModificationTime())
       .setAccessTime(newNode.getAccessTime())
       .setBlockSize(newNode.getPreferredBlockSize())
@@ -680,7 +685,7 @@ public class FSEditLog implements LogsPu
   public void logCloseFile(String path, INodeFile newNode) {
     CloseOp op = CloseOp.getInstance(cache.get())
       .setPath(path)
-      .setReplication(newNode.getBlockReplication())
+      .setReplication(newNode.getFileReplication())
       .setModificationTime(newNode.getModificationTime())
       .setAccessTime(newNode.getAccessTime())
       .setBlockSize(newNode.getPreferredBlockSize())
@@ -870,6 +875,37 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
+  void logCreateSnapshot(String snapRoot, String snapName) {
+    CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
+        .setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+    logEdit(op);
+  }
+  
+  void logDeleteSnapshot(String snapRoot, String snapName) {
+    DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
+        .setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+    logEdit(op);
+  }
+  
+  void logRenameSnapshot(String path, String snapOldName, String snapNewName) {
+    RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
+        .setSnapshotRoot(path).setSnapshotOldName(snapOldName)
+        .setSnapshotNewName(snapNewName);
+    logEdit(op);
+  }
+  
+  void logAllowSnapshot(String path) {
+    AllowSnapshotOp op = AllowSnapshotOp.getInstance(cache.get())
+        .setSnapshotRoot(path);
+    logEdit(op);
+  }
+
+  void logDisallowSnapshot(String path) {
+    DisallowSnapshotOp op = DisallowSnapshotOp.getInstance(cache.get())
+        .setSnapshotRoot(path);
+    logEdit(op);
+  }
+  
   /**
    * Get all the journals this edit log is currently operating on.
    */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu May  9 23:55:49 2013
@@ -22,8 +22,10 @@ import static org.apache.hadoop.util.Tim
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,16 +39,21 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CreateSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteSnapshotOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
@@ -58,6 +65,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.util.Holder;
 
@@ -268,7 +276,9 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append
 
       // See if the file already exists (persistBlocks call)
-      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+      final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path);
+      final INodeFile oldFile = INodeFile.valueOf(
+          iip.getINode(0), addCloseOp.path, true);
       INodeFile newFile = oldFile;
       if (oldFile == null) { // this is OP_ADD on a new file (case 1)
         // versions > 0 support per file replication
@@ -280,7 +290,7 @@ public class FSEditLogLoader {
         // add to the file tree
         inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
             lastInodeId);
-        newFile = (INodeFile) fsDir.unprotectedAddFile(inodeId,
+        newFile = fsDir.unprotectedAddFile(inodeId,
             addCloseOp.path, addCloseOp.permissions, replication,
             addCloseOp.mtime, addCloseOp.atime, addCloseOp.blockSize, true,
             addCloseOp.clientName, addCloseOp.clientMachine);
@@ -295,8 +305,9 @@ public class FSEditLogLoader {
           }
           fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
               addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false);
-          newFile = getINodeFile(fsDir, addCloseOp.path);
+              false, iip.getLatestSnapshot());
+          newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
+              addCloseOp.path, true);
         }
       }
       // Fall-through for case 2.
@@ -304,8 +315,8 @@ public class FSEditLogLoader {
       // update the block list.
       
       // Update the salient file attributes.
-      newFile.setAccessTime(addCloseOp.atime);
-      newFile.setModificationTimeForce(addCloseOp.mtime);
+      newFile.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
+      newFile.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
       updateBlocks(fsDir, addCloseOp, newFile);
       break;
     }
@@ -319,15 +330,12 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
       }
 
-      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
-      if (oldFile == null) {
-        throw new IOException("Operation trying to close non-existent file " +
-            addCloseOp.path);
-      }
-      
+      final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path);
+      final INodeFile oldFile = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
+
       // Update the salient file attributes.
-      oldFile.setAccessTime(addCloseOp.atime);
-      oldFile.setModificationTimeForce(addCloseOp.mtime);
+      oldFile.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
+      oldFile.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
       updateBlocks(fsDir, addCloseOp, oldFile);
 
       // Now close the file
@@ -344,8 +352,8 @@ public class FSEditLogLoader {
       if (oldFile.isUnderConstruction()) {
         INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
         fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
-        INodeFile newFile = ucFile.convertToInodeFile();
-        fsDir.unprotectedReplaceNode(addCloseOp.path, ucFile, newFile);
+        INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime());
+        fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile);
       }
       break;
     }
@@ -355,13 +363,8 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + updateOp.path +
             " numblocks : " + updateOp.blocks.length);
       }
-      INodeFile oldFile = getINodeFile(fsDir, updateOp.path);
-      if (oldFile == null) {
-        throw new IOException(
-            "Operation trying to update blocks in non-existent file " +
-            updateOp.path);
-      }
-      
+      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(updateOp.path),
+          updateOp.path);
       // Update in-memory data structures
       updateBlocks(fsDir, updateOp, oldFile);
       break;
@@ -510,6 +513,44 @@ public class FSEditLogLoader {
       // no data in here currently.
       break;
     }
+    case OP_CREATE_SNAPSHOT: {
+      CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
+      fsNamesys.getSnapshotManager().createSnapshot(
+          createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName);
+      break;
+    }
+    case OP_DELETE_SNAPSHOT: {
+      DeleteSnapshotOp deleteSnapshotOp = (DeleteSnapshotOp) op;
+      BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+      List<INode> removedINodes = new ArrayList<INode>();
+      fsNamesys.getSnapshotManager().deleteSnapshot(
+          deleteSnapshotOp.snapshotRoot, deleteSnapshotOp.snapshotName,
+          collectedBlocks, removedINodes);
+      fsNamesys.removeBlocks(collectedBlocks);
+      collectedBlocks.clear();
+      fsNamesys.dir.removeFromInodeMap(removedINodes);
+      removedINodes.clear();
+      break;
+    }
+    case OP_RENAME_SNAPSHOT: {
+      RenameSnapshotOp renameSnapshotOp = (RenameSnapshotOp) op;
+      fsNamesys.getSnapshotManager().renameSnapshot(
+          renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName,
+          renameSnapshotOp.snapshotNewName);
+      break;
+    }
+    case OP_ALLOW_SNAPSHOT: {
+      AllowSnapshotOp allowSnapshotOp = (AllowSnapshotOp) op;
+      fsNamesys.getSnapshotManager().setSnapshottable(
+          allowSnapshotOp.snapshotRoot, false);
+      break;
+    }
+    case OP_DISALLOW_SNAPSHOT: {
+      DisallowSnapshotOp disallowSnapshotOp = (DisallowSnapshotOp) op;
+      fsNamesys.getSnapshotManager().resetSnapshottable(
+          disallowSnapshotOp.snapshotRoot);
+      break;
+    }
     default:
       throw new IOException("Invalid operation read " + op.opCode);
     }
@@ -532,18 +573,7 @@ public class FSEditLogLoader {
     }
     return sb.toString();
   }
-  
-  private static INodeFile getINodeFile(FSDirectory fsDir, String path)
-      throws IOException {
-    INode inode = fsDir.getINode(path);
-    if (inode != null) {
-      if (!(inode instanceof INodeFile)) {
-        throw new IOException("Operation trying to get non-file " + path);
-      }
-    }
-    return (INodeFile)inode;
-  }
-  
+
   /**
    * Update in-memory data structures with new block information.
    * @throws IOException

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu May  9 23:55:49 2013
@@ -110,6 +110,12 @@ public abstract class FSEditLogOp {
       inst.put(OP_END_LOG_SEGMENT,
                     new LogSegmentOp(OP_END_LOG_SEGMENT));
       inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
+
+      inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp());
+      inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp());
+      inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
+      inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
+      inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -2210,6 +2216,309 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /**
+   * Operation corresponding to creating a snapshot
+   */
+  static class CreateSnapshotOp extends FSEditLogOp {
+    String snapshotRoot;
+    String snapshotName;
+    
+    public CreateSnapshotOp() {
+      super(OP_CREATE_SNAPSHOT);
+    }
+    
+    static CreateSnapshotOp getInstance(OpInstanceCache cache) {
+      return (CreateSnapshotOp)cache.get(OP_CREATE_SNAPSHOT);
+    }
+    
+    CreateSnapshotOp setSnapshotName(String snapName) {
+      this.snapshotName = snapName;
+      return this;
+    }
+
+    public CreateSnapshotOp setSnapshotRoot(String snapRoot) {
+      snapshotRoot = snapRoot;
+      return this;
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      snapshotRoot = FSImageSerialization.readString(in);
+      snapshotName = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(snapshotRoot, out);
+      FSImageSerialization.writeString(snapshotName, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      snapshotRoot = st.getValue("SNAPSHOTROOT");
+      snapshotName = st.getValue("SNAPSHOTNAME");
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("CreateSnapshotOp [snapshotRoot=");
+      builder.append(snapshotRoot);
+      builder.append(", snapshotName=");
+      builder.append(snapshotName);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+  
+  /**
+   * Operation corresponding to delete a snapshot
+   */
+  static class DeleteSnapshotOp extends FSEditLogOp {
+    String snapshotRoot;
+    String snapshotName;
+    
+    DeleteSnapshotOp() {
+      super(OP_DELETE_SNAPSHOT);
+    }
+    
+    static DeleteSnapshotOp getInstance(OpInstanceCache cache) {
+      return (DeleteSnapshotOp)cache.get(OP_DELETE_SNAPSHOT);
+    }
+    
+    DeleteSnapshotOp setSnapshotName(String snapName) {
+      this.snapshotName = snapName;
+      return this;
+    }
+
+    DeleteSnapshotOp setSnapshotRoot(String snapRoot) {
+      snapshotRoot = snapRoot;
+      return this;
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      snapshotRoot = FSImageSerialization.readString(in);
+      snapshotName = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(snapshotRoot, out);
+      FSImageSerialization.writeString(snapshotName, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      snapshotRoot = st.getValue("SNAPSHOTROOT");
+      snapshotName = st.getValue("SNAPSHOTNAME");
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("DeleteSnapshotOp [snapshotRoot=");
+      builder.append(snapshotRoot);
+      builder.append(", snapshotName=");
+      builder.append(snapshotName);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+  
+  /**
+   * Operation corresponding to rename a snapshot
+   */
+  static class RenameSnapshotOp extends FSEditLogOp {
+    String snapshotRoot;
+    String snapshotOldName;
+    String snapshotNewName;
+    
+    RenameSnapshotOp() {
+      super(OP_RENAME_SNAPSHOT);
+    }
+    
+    static RenameSnapshotOp getInstance(OpInstanceCache cache) {
+      return (RenameSnapshotOp) cache.get(OP_RENAME_SNAPSHOT);
+    }
+    
+    RenameSnapshotOp setSnapshotOldName(String snapshotOldName) {
+      this.snapshotOldName = snapshotOldName;
+      return this;
+    }
+
+    RenameSnapshotOp setSnapshotNewName(String snapshotNewName) {
+      this.snapshotNewName = snapshotNewName;
+      return this;
+    }
+    
+    RenameSnapshotOp setSnapshotRoot(String snapshotRoot) {
+      this.snapshotRoot = snapshotRoot;
+      return this;
+    }
+    
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      snapshotRoot = FSImageSerialization.readString(in);
+      snapshotOldName = FSImageSerialization.readString(in);
+      snapshotNewName = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(snapshotRoot, out);
+      FSImageSerialization.writeString(snapshotOldName, out);
+      FSImageSerialization.writeString(snapshotNewName, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      snapshotRoot = st.getValue("SNAPSHOTROOT");
+      snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
+      snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("RenameSnapshotOp [snapshotRoot=");
+      builder.append(snapshotRoot);
+      builder.append(", snapshotOldName=");
+      builder.append(snapshotOldName);
+      builder.append(", snapshotNewName=");
+      builder.append(snapshotNewName);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /**
+   * Operation corresponding to allow creating snapshot on a directory
+   */
+  static class AllowSnapshotOp extends FSEditLogOp {
+    String snapshotRoot;
+
+    public AllowSnapshotOp() {
+      super(OP_ALLOW_SNAPSHOT);
+    }
+
+    public AllowSnapshotOp(String snapRoot) {
+      super(OP_ALLOW_SNAPSHOT);
+      snapshotRoot = snapRoot;
+    }
+
+    static AllowSnapshotOp getInstance(OpInstanceCache cache) {
+      return (AllowSnapshotOp) cache.get(OP_ALLOW_SNAPSHOT);
+    }
+
+    public AllowSnapshotOp setSnapshotRoot(String snapRoot) {
+      snapshotRoot = snapRoot;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      snapshotRoot = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(snapshotRoot, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      snapshotRoot = st.getValue("SNAPSHOTROOT");
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("AllowSnapshotOp [snapshotRoot=");
+      builder.append(snapshotRoot);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /**
+   * Operation corresponding to disallow creating snapshot on a directory
+   */
+  static class DisallowSnapshotOp extends FSEditLogOp {
+    String snapshotRoot;
+
+    public DisallowSnapshotOp() {
+      super(OP_DISALLOW_SNAPSHOT);
+    }
+
+    public DisallowSnapshotOp(String snapRoot) {
+      super(OP_DISALLOW_SNAPSHOT);
+      snapshotRoot = snapRoot;
+    }
+
+    static DisallowSnapshotOp getInstance(OpInstanceCache cache) {
+      return (DisallowSnapshotOp) cache.get(OP_DISALLOW_SNAPSHOT);
+    }
+
+    public DisallowSnapshotOp setSnapshotRoot(String snapRoot) {
+      snapshotRoot = snapRoot;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      snapshotRoot = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(snapshotRoot, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      snapshotRoot = st.getValue("SNAPSHOTROOT");
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("DisallowSnapshotOp [snapshotRoot=");
+      builder.append(snapshotRoot);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
   static private short readShort(DataInputStream in) throws IOException {
     return Short.parseShort(FSImageSerialization.readString(in));
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Thu May  9 23:55:49 2013
@@ -56,7 +56,12 @@ public enum FSEditLogOpCodes {
   OP_REASSIGN_LEASE             ((byte) 22),
   OP_END_LOG_SEGMENT            ((byte) 23),
   OP_START_LOG_SEGMENT          ((byte) 24),
-  OP_UPDATE_BLOCKS              ((byte) 25);
+  OP_UPDATE_BLOCKS              ((byte) 25),
+  OP_CREATE_SNAPSHOT            ((byte) 26),
+  OP_DELETE_SNAPSHOT            ((byte) 27),
+  OP_RENAME_SNAPSHOT            ((byte) 28),
+  OP_ALLOW_SNAPSHOT             ((byte) 29),
+  OP_DISALLOW_SNAPSHOT          ((byte) 30);
 
   private byte opCode;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu May  9 23:55:49 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.now;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -31,24 +33,23 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.util.Time.now;
-
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-
 import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -62,9 +63,6 @@ import org.apache.hadoop.hdfs.util.MD5Fi
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.IdGenerator;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HAUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -737,11 +735,58 @@ public class FSImage implements Closeabl
     } finally {
       FSEditLog.closeAllStreams(editStreams);
       // update the counts
-      target.dir.updateCountForINodeWithQuota();   
+      updateCountForQuota(target.dir.rootDir);   
     }
     return lastAppliedTxId - prevLastAppliedTxId;
   }
 
+  /**
+   * Update the count of each directory with quota in the namespace.
+   * A directory's count is defined as the total number inodes in the tree
+   * rooted at the directory.
+   * 
+   * This is an update of existing state of the filesystem and does not
+   * throw QuotaExceededException.
+   */
+  static void updateCountForQuota(INodeDirectoryWithQuota root) {
+    updateCountForQuotaRecursively(root, Quota.Counts.newInstance());
+  }
+  
+  private static void updateCountForQuotaRecursively(INodeDirectory dir,
+      Quota.Counts counts) {
+    final long parentNamespace = counts.get(Quota.NAMESPACE);
+    final long parentDiskspace = counts.get(Quota.DISKSPACE);
+
+    dir.computeQuotaUsage4CurrentDirectory(counts);
+    
+    for (INode child : dir.getChildrenList(null)) {
+      if (child.isDirectory()) {
+        updateCountForQuotaRecursively(child.asDirectory(), counts);
+      } else {
+        // file or symlink: count here to reduce recursive calls.
+        child.computeQuotaUsage(counts, false);
+      }
+    }
+      
+    if (dir.isQuotaSet()) {
+      // check if quota is violated. It indicates a software bug.
+      final long namespace = counts.get(Quota.NAMESPACE) - parentNamespace;
+      if (Quota.isViolated(dir.getNsQuota(), namespace)) {
+        LOG.error("BUG: Namespace quota violation in image for "
+            + dir.getFullPathName()
+            + " quota = " + dir.getNsQuota() + " < consumed = " + namespace);
+      }
+
+      final long diskspace = counts.get(Quota.DISKSPACE) - parentDiskspace;
+      if (Quota.isViolated(dir.getDsQuota(), diskspace)) {
+        LOG.error("BUG: Diskspace quota violation in image for "
+            + dir.getFullPathName()
+            + " quota = " + dir.getDsQuota() + " < consumed = " + diskspace);
+      }
+
+      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(namespace, diskspace);
+    }
+  }
 
   /**
    * Load the image namespace from the given image file, verifying

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java Thu May  9 23:55:49 2013
@@ -17,23 +17,23 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
-import org.apache.hadoop.io.Text;
-
 /**
  * Simple container class that handles support for compressed fsimage files.
  */
@@ -108,15 +108,14 @@ class FSImageCompression {
    * underlying IO fails.
    */
   static FSImageCompression readCompressionHeader(
-    Configuration conf,
-    DataInputStream dis) throws IOException
+    Configuration conf, DataInput in) throws IOException
   {
-    boolean isCompressed = dis.readBoolean();
+    boolean isCompressed = in.readBoolean();
 
     if (!isCompressed) {
       return createNoopCompression();
     } else {
-      String codecClassName = Text.readString(dis);
+      String codecClassName = Text.readString(in);
       return createCompression(conf, codecClassName);
     }
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu May  9 23:55:49 2013
@@ -19,18 +19,21 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.apache.hadoop.util.Time.now;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.DigestInputStream;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -38,14 +41,24 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotDirectoryException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 
@@ -56,13 +69,14 @@ import org.apache.hadoop.io.Text;
  * In particular, the format of the FSImage looks like:
  * <pre>
  * FSImage {
- *   LayoutVersion: int, NamespaceID: int, NumberItemsInFSDirectoryTree: long,
- *   NamesystemGenerationStamp: long, TransactionID: long
+ *   layoutVersion: int, namespaceID: int, numberItemsInFSDirectoryTree: long,
+ *   namesystemGenerationStamp: long, transactionID: long, 
+ *   snapshotCounter: int, numberOfSnapshots: int, numOfSnapshottableDirs: int,
  *   {FSDirectoryTree, FilesUnderConstruction, SecretManagerState} (can be compressed)
  * }
  * 
  * FSDirectoryTree (if {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is supported) {
- *   INodeInfo of root, NumberOfChildren of root: int
+ *   INodeInfo of root, numberOfChildren of root: int
  *   [list of INodeInfo of root's children],
  *   [list of INodeDirectoryInfo of root's directory children]
  * }
@@ -73,38 +87,83 @@ import org.apache.hadoop.io.Text;
  * 
  * INodeInfo {
  *   {
- *     LocalName: short + byte[]
+ *     localName: short + byte[]
  *   } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is supported
  *   or 
  *   {
- *     FullPath: byte[]
+ *     fullPath: byte[]
  *   } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is not supported
- *   ReplicationFactor: short, ModificationTime: long,
- *   AccessTime: long, PreferredBlockSize: long,
- *   NumberOfBlocks: int (-1 for INodeDirectory, -2 for INodeSymLink),
+ *   replicationFactor: short, modificationTime: long,
+ *   accessTime: long, preferredBlockSize: long,
+ *   numberOfBlocks: int (-1 for INodeDirectory, -2 for INodeSymLink),
  *   { 
- *     NsQuota: long, DsQuota: long, FsPermission: short, PermissionStatus
+ *     nsQuota: long, dsQuota: long, 
+ *     {
+ *       isINodeSnapshottable: byte,
+ *       isINodeWithSnapshot: byte (if isINodeSnapshottable is false)
+ *     } (when {@link Feature#SNAPSHOT} is supported), 
+ *     fsPermission: short, PermissionStatus
  *   } for INodeDirectory
  *   or 
  *   {
- *     SymlinkString, FsPermission: short, PermissionStatus
+ *     symlinkString, fsPermission: short, PermissionStatus
  *   } for INodeSymlink
  *   or
  *   {
- *     [list of BlockInfo], FsPermission: short, PermissionStatus
+ *     [list of BlockInfo]
+ *     [list of FileDiff]
+ *     {
+ *       isINodeFileUnderConstructionSnapshot: byte, 
+ *       {clientName: short + byte[], clientMachine: short + byte[]} (when 
+ *       isINodeFileUnderConstructionSnapshot is true),
+ *     } (when {@link Feature#SNAPSHOT} is supported and writing snapshotINode), 
+ *     fsPermission: short, PermissionStatus
  *   } for INodeFile
  * }
  * 
  * INodeDirectoryInfo {
- *   FullPath of the directory: short + byte[],
- *   NumberOfChildren: int, [list of INodeInfo of children INode]
- *   [list of INodeDirectoryInfo of the directory children]
+ *   fullPath of the directory: short + byte[],
+ *   numberOfChildren: int, [list of INodeInfo of children INode],
+ *   {
+ *     numberOfSnapshots: int,
+ *     [list of Snapshot] (when NumberOfSnapshots is positive),
+ *     numberOfDirectoryDiffs: int,
+ *     [list of DirectoryDiff] (NumberOfDirectoryDiffs is positive),
+ *     number of children that are directories,
+ *     [list of INodeDirectoryInfo of the directory children] (includes
+ *     snapshot copies of deleted sub-directories)
+ *   } (when {@link Feature#SNAPSHOT} is supported), 
+ * }
+ * 
+ * Snapshot {
+ *   snapshotID: int, root of Snapshot: INodeDirectoryInfo (its local name is 
+ *   the name of the snapshot)
+ * }
+ * 
+ * DirectoryDiff {
+ *   full path of the root of the associated Snapshot: short + byte[], 
+ *   childrenSize: int, 
+ *   isSnapshotRoot: byte, 
+ *   snapshotINodeIsNotNull: byte (when isSnapshotRoot is false),
+ *   snapshotINode: INodeDirectory (when SnapshotINodeIsNotNull is true), Diff 
+ * }
+ * 
+ * Diff {
+ *   createdListSize: int, [Local name of INode in created list],
+ *   deletedListSize: int, [INode in deleted list: INodeInfo]
+ * }
+ *
+ * FileDiff {
+ *   full path of the root of the associated Snapshot: short + byte[], 
+ *   fileSize: long, 
+ *   snapshotINodeIsNotNull: byte,
+ *   snapshotINode: INodeFile (when SnapshotINodeIsNotNull is true), Diff 
  * }
  * </pre>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class FSImageFormat {
+public class FSImageFormat {
   private static final Log LOG = FSImage.LOG;
   
   // Static-only class
@@ -115,7 +174,7 @@ class FSImageFormat {
    * should be called once, after which the getter methods may be used to retrieve
    * information about the image that was loaded, if loading was successful.
    */
-  static class Loader {
+  public static class Loader {
     private final Configuration conf;
     /** which namesystem this loader is working for */
     private final FSNamesystem namesystem;
@@ -127,6 +186,9 @@ class FSImageFormat {
     private long imgTxId;
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
+    
+    private Map<Integer, Snapshot> snapshotMap = null;
+    private final ReferenceMap referenceMap = new ReferenceMap();
 
     Loader(Configuration conf, FSNamesystem namesystem) {
       this.conf = conf;
@@ -165,9 +227,7 @@ class FSImageFormat {
       }
     }
 
-    void load(File curFile)
-      throws IOException
-    {
+    void load(File curFile) throws IOException {
       checkNotLoaded();
       assert curFile != null : "curFile is null";
 
@@ -189,6 +249,8 @@ class FSImageFormat {
               "imgVersion " + imgVersion +
               " expected to be " + getLayoutVersion());
         }
+        boolean supportSnapshot = LayoutVersion.supports(Feature.SNAPSHOT,
+            imgVersion);
 
         // read namespaceID: first appeared in version -2
         in.readInt();
@@ -221,6 +283,10 @@ class FSImageFormat {
           }
         }
         
+        if (supportSnapshot) {
+          snapshotMap = namesystem.getSnapshotManager().read(in, this);
+        }
+
         // read compression related info
         FSImageCompression compression;
         if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imgVersion)) {
@@ -236,12 +302,16 @@ class FSImageFormat {
         LOG.info("Number of files = " + numFiles);
         if (LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
             imgVersion)) {
-          loadLocalNameINodes(numFiles, in);
+          if (supportSnapshot) {
+            loadLocalNameINodesWithSnapshot(in);
+          } else {
+            loadLocalNameINodes(numFiles, in);
+          }
         } else {
           loadFullNameINodes(numFiles, in);
         }
 
-        loadFilesUnderConstruction(in);
+        loadFilesUnderConstruction(in, supportSnapshot);
 
         loadSecretManagerState(in);
 
@@ -260,17 +330,35 @@ class FSImageFormat {
     }
 
   /** Update the root node's attributes */
-  private void updateRootAttr(INode root) {                                                           
+  private void updateRootAttr(INodeWithAdditionalFields root) {                                                           
     long nsQuota = root.getNsQuota();
     long dsQuota = root.getDsQuota();
     FSDirectory fsDir = namesystem.dir;
     if (nsQuota != -1 || dsQuota != -1) {
       fsDir.rootDir.setQuota(nsQuota, dsQuota);
     }
-    fsDir.rootDir.setModificationTime(root.getModificationTime());
+    fsDir.rootDir.cloneModificationTime(root);
     fsDir.rootDir.clonePermissionStatus(root);    
   }
-
+  
+    /**
+     * Load fsimage files when 1) only local names are stored, 
+     * and 2) snapshot is supported.
+     * 
+     * @param in Image input stream
+     */
+    private void loadLocalNameINodesWithSnapshot(DataInput in)
+        throws IOException {
+      assert LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
+          getLayoutVersion());
+      assert LayoutVersion.supports(Feature.SNAPSHOT, getLayoutVersion());
+      
+      // load root
+      loadRoot(in);
+      // load rest of the nodes recursively
+      loadDirectoryWithSnapshot(in);
+    }
+    
   /** 
    * load fsimage files assuming only local names are stored
    *   
@@ -278,20 +366,16 @@ class FSImageFormat {
    * @param in image input stream
    * @throws IOException
    */  
-   private void loadLocalNameINodes(long numFiles, DataInputStream in)
-        throws IOException {
+   private void loadLocalNameINodes(long numFiles, DataInput in) 
+       throws IOException {
      assert LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
          getLayoutVersion());
      assert numFiles > 0;
 
      // load root
-     if( in.readShort() != 0) {
-       throw new IOException("First node is not root");
-     }   
-     INode root = loadINode(in);
-     // update the root's attributes
-     updateRootAttr(root);
-     numFiles--;
+     loadRoot(in);
+     // have loaded the first file (the root)
+     numFiles--; 
 
      // load rest of the nodes directory by directory
      while (numFiles > 0) {
@@ -302,6 +386,78 @@ class FSImageFormat {
      }
    }
    
+    /**
+     * Load information about root, and use the information to update the root
+     * directory of NameSystem.
+     * @param in The {@link DataInput} instance to read.
+     */
+    private void loadRoot(DataInput in) throws IOException {
+      // load root
+      if (in.readShort() != 0) {
+        throw new IOException("First node is not root");
+      }
+      final INodeDirectory root = loadINode(null, false, in).asDirectory();
+      // update the root's attributes
+      updateRootAttr(root);
+    }
+   
+    /** Load children nodes for the parent directory. */
+    private int loadChildren(INodeDirectory parent, DataInput in)
+        throws IOException {
+      int numChildren = in.readInt();
+      for (int i = 0; i < numChildren; i++) {
+        // load single inode
+        INode newNode = loadINodeWithLocalName(false, in);
+        addToParent(parent, newNode);
+      }
+      return numChildren;
+    }
+    
+    /**
+     * Load a directory when snapshot is supported.
+     * @param in The {@link DataInput} instance to read.
+     */
+    private void loadDirectoryWithSnapshot(DataInput in)
+        throws IOException {
+      // Step 1. Identify the parent INode
+      long inodeId = in.readLong();
+      final INodeDirectory parent = this.namesystem.dir.getInode(inodeId)
+          .asDirectory();
+      
+      // Check if the whole subtree has been saved (for reference nodes)
+      boolean toLoadSubtree = referenceMap.toProcessSubtree(parent.getId());
+      if (!toLoadSubtree) {
+        return;
+      }
+      
+      // Step 2. Load snapshots if parent is snapshottable
+      int numSnapshots = in.readInt();
+      if (numSnapshots >= 0) {
+        final INodeDirectorySnapshottable snapshottableParent
+            = INodeDirectorySnapshottable.valueOf(parent, parent.getLocalName());
+        if (snapshottableParent.getParent() != null) { // not root
+          this.namesystem.getSnapshotManager().addSnapshottable(
+              snapshottableParent);
+        }
+        // load snapshots and snapshotQuota
+        SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
+            numSnapshots, in, this);
+      }
+
+      // Step 3. Load children nodes under parent
+      loadChildren(parent, in);
+      
+      // Step 4. load Directory Diff List
+      SnapshotFSImageFormat.loadDirectoryDiffList(parent, in, this);
+      
+      // Recursively load sub-directories, including snapshot copies of deleted
+      // directories
+      int numSubTree = in.readInt();
+      for (int i = 0; i < numSubTree; i++) {
+        loadDirectoryWithSnapshot(in);
+      }
+    }
+    
    /**
     * Load all children of a directory
     * 
@@ -309,24 +465,11 @@ class FSImageFormat {
     * @return number of child inodes read
     * @throws IOException
     */
-   private int loadDirectory(DataInputStream in) throws IOException {
+   private int loadDirectory(DataInput in) throws IOException {
      String parentPath = FSImageSerialization.readString(in);
-     FSDirectory fsDir = namesystem.dir;
      final INodeDirectory parent = INodeDirectory.valueOf(
-         fsDir.rootDir.getNode(parentPath, true), parentPath);
-
-     int numChildren = in.readInt();
-     for(int i=0; i<numChildren; i++) {
-       // load single inode
-       byte[] localName = new byte[in.readShort()];
-       in.readFully(localName); // read local name
-       INode newNode = loadINode(in); // read rest of inode
-
-       // add to parent
-       newNode.setLocalName(localName);
-       addToParent(parent, newNode);
-     }
-     return numChildren;
+         namesystem.dir.rootDir.getNode(parentPath, true), parentPath);
+     return loadChildren(parent, in);
    }
 
   /**
@@ -337,38 +480,50 @@ class FSImageFormat {
    * @throws IOException if any error occurs
    */
   private void loadFullNameINodes(long numFiles,
-      DataInputStream in) throws IOException {
+      DataInput in) throws IOException {
     byte[][] pathComponents;
     byte[][] parentPath = {{}};      
     FSDirectory fsDir = namesystem.dir;
     INodeDirectory parentINode = fsDir.rootDir;
     for (long i = 0; i < numFiles; i++) {
       pathComponents = FSImageSerialization.readPathComponents(in);
-      INode newNode = loadINode(in);
+      final INode newNode = loadINode(
+          pathComponents[pathComponents.length-1], false, in);
 
       if (isRoot(pathComponents)) { // it is the root
         // update the root's attributes
-        updateRootAttr(newNode);
+        updateRootAttr(newNode.asDirectory());
         continue;
       }
       // check if the new inode belongs to the same parent
       if(!isParent(pathComponents, parentPath)) {
-        parentINode = fsDir.rootDir.getParent(pathComponents);
+        parentINode = getParentINodeDirectory(pathComponents);
         parentPath = getParent(pathComponents);
       }
 
       // add new inode
-      newNode.setLocalName(pathComponents[pathComponents.length-1]);
       addToParent(parentINode, newNode);
     }
   }
 
+  private INodeDirectory getParentINodeDirectory(byte[][] pathComponents
+      ) throws FileNotFoundException, PathIsNotDirectoryException,
+      UnresolvedLinkException {
+    if (pathComponents.length < 2) { // root
+      return null;
+    }
+    // Gets the parent INode
+    final INodesInPath inodes = namesystem.dir.getExistingPathINodes(
+        pathComponents);
+    return INodeDirectory.valueOf(inodes.getINode(-2), pathComponents);
+  }
+
   /**
    * Add the child node to parent and, if child is a file, update block map.
    * This method is only used for image loading so that synchronization,
    * modification time update and space count update are not needed.
    */
-  void addToParent(INodeDirectory parent, INode child) {
+  private void addToParent(INodeDirectory parent, INode child) {
     FSDirectory fsDir = namesystem.dir;
     if (parent == fsDir.rootDir && FSDirectory.isReservedName(child)) {
         throw new HadoopIllegalArgumentException("File name \""
@@ -377,81 +532,166 @@ class FSImageFormat {
             + "name before upgrading to this release.");
     }
     // NOTE: This does not update space counts for parents
-    if (!parent.addChild(child, false)) {
+    if (!parent.addChild(child)) {
       return;
     }
     namesystem.dir.cacheName(child);
 
     if (child.isFile()) {
       // Add file->block mapping
-      final INodeFile file = (INodeFile)child;
+      final INodeFile file = child.asFile();
       final BlockInfo[] blocks = file.getBlocks();
-      final BlockManager bm = namesystem.getBlockManager();
-      for (int i = 0; i < blocks.length; i++) {
-        file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+      if (blocks != null) {
+        final BlockManager bm = namesystem.getBlockManager();
+        for (int i = 0; i < blocks.length; i++) {
+          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+        } 
       }
     }
   }
 
+    /** @return The FSDirectory of the namesystem where the fsimage is loaded */
+    public FSDirectory getFSDirectoryInLoading() {
+      return namesystem.dir;
+    }
+
+    public INode loadINodeWithLocalName(boolean isSnapshotINode,
+        DataInput in) throws IOException {
+      final byte[] localName = FSImageSerialization.readLocalName(in);
+      INode inode = loadINode(localName, isSnapshotINode, in);
+      if (LayoutVersion.supports(Feature.ADD_INODE_ID, getLayoutVersion())) {
+        namesystem.dir.addToInodeMap(inode);
+      }
+      return inode;
+    }
+  
   /**
    * load an inode from fsimage except for its name
    * 
    * @param in data input stream from which image is read
    * @return an inode
    */
-  private INode loadINode(DataInputStream in)
-      throws IOException {
-    long modificationTime = 0;
-    long atime = 0;
-    long blockSize = 0;
-    
-    int imgVersion = getLayoutVersion();
+  INode loadINode(final byte[] localName, boolean isSnapshotINode,
+      DataInput in) throws IOException {
+    final int imgVersion = getLayoutVersion();
+    if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)) {
+      namesystem.getFSDirectory().verifyINodeName(localName);
+    }
+
     long inodeId = LayoutVersion.supports(Feature.ADD_INODE_ID, imgVersion) ? 
            in.readLong() : namesystem.allocateNewInodeId();
     
-    short replication = in.readShort();
-    replication = namesystem.getBlockManager().adjustReplication(replication);
-    modificationTime = in.readLong();
+    final short replication = namesystem.getBlockManager().adjustReplication(
+        in.readShort());
+    final long modificationTime = in.readLong();
+    long atime = 0;
     if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imgVersion)) {
       atime = in.readLong();
     }
-    blockSize = in.readLong();
-    int numBlocks = in.readInt();
-    BlockInfo blocks[] = null;
+    final long blockSize = in.readLong();
+    final int numBlocks = in.readInt();
 
     if (numBlocks >= 0) {
-      blocks = new BlockInfo[numBlocks];
-      for (int j = 0; j < numBlocks; j++) {
-        blocks[j] = new BlockInfo(replication);
-        blocks[j].readFields(in);
+      // file
+      
+      // read blocks
+      BlockInfo[] blocks = null;
+      if (numBlocks >= 0) {
+        blocks = new BlockInfo[numBlocks];
+        for (int j = 0; j < numBlocks; j++) {
+          blocks[j] = new BlockInfo(replication);
+          blocks[j].readFields(in);
+        }
       }
-    }
-    
-    // get quota only when the node is a directory
-    long nsQuota = -1L;
-    if (blocks == null && numBlocks == -1) {
-      nsQuota = in.readLong();
-    }
-    long dsQuota = -1L;
-    if (LayoutVersion.supports(Feature.DISKSPACE_QUOTA, imgVersion)
-        && blocks == null && numBlocks == -1) {
-      dsQuota = in.readLong();
-    }
 
-    // Read the symlink only when the node is a symlink
-    String symlink = "";
-    if (numBlocks == -2) {
-      symlink = Text.readString(in);
+      String clientName = "";
+      String clientMachine = "";
+      boolean underConstruction = false;
+      FileDiffList fileDiffs = null;
+      if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)) {
+        // read diffs
+        fileDiffs = SnapshotFSImageFormat.loadFileDiffList(in, this);
+
+        if (isSnapshotINode) {
+          underConstruction = in.readBoolean();
+          if (underConstruction) {
+            clientName = FSImageSerialization.readString(in);
+            clientMachine = FSImageSerialization.readString(in);
+          }
+        }
+      }
+
+      final PermissionStatus permissions = PermissionStatus.read(in);
+
+      // return
+      final INodeFile file = new INodeFile(inodeId, localName, permissions,
+          modificationTime, atime, blocks, replication, blockSize);
+      return fileDiffs != null? new INodeFileWithSnapshot(file, fileDiffs)
+          : underConstruction? new INodeFileUnderConstruction(
+              file, clientName, clientMachine, null)
+          : file;
+    } else if (numBlocks == -1) {
+      //directory
+      
+      //read quotas
+      final long nsQuota = in.readLong();
+      long dsQuota = -1L;
+      if (LayoutVersion.supports(Feature.DISKSPACE_QUOTA, imgVersion)) {
+        dsQuota = in.readLong();
+      }
+
+      //read snapshot info
+      boolean snapshottable = false;
+      boolean withSnapshot = false;
+      if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)) {
+        snapshottable = in.readBoolean();
+        if (!snapshottable) {
+          withSnapshot = in.readBoolean();
+        }
+      }
+
+      final PermissionStatus permissions = PermissionStatus.read(in);
+
+      //return
+      final INodeDirectory dir = nsQuota >= 0 || dsQuota >= 0?
+          new INodeDirectoryWithQuota(inodeId, localName, permissions,
+              modificationTime, nsQuota, dsQuota)
+          : new INodeDirectory(inodeId, localName, permissions, modificationTime);
+      return snapshottable ? new INodeDirectorySnapshottable(dir)
+          : withSnapshot ? new INodeDirectoryWithSnapshot(dir)
+          : dir;
+    } else if (numBlocks == -2) {
+      //symlink
+
+      final String symlink = Text.readString(in);
+      final PermissionStatus permissions = PermissionStatus.read(in);
+      return new INodeSymlink(inodeId, localName, permissions,
+          modificationTime, atime, symlink);
+    } else if (numBlocks == -3) {
+      //reference
+      
+      final boolean isWithName = in.readBoolean();
+      // lastSnapshotId for WithName node, dstSnapshotId for DstReference node
+      int snapshotId = in.readInt();
+      
+      final INodeReference.WithCount withCount
+          = referenceMap.loadINodeReferenceWithCount(isSnapshotINode, in, this);
+
+      if (isWithName) {
+          return new INodeReference.WithName(null, withCount, localName,
+              snapshotId);
+      } else {
+        final INodeReference ref = new INodeReference.DstReference(null,
+            withCount, snapshotId);
+        return ref;
+      }
     }
     
-    PermissionStatus permissions = PermissionStatus.read(in);
-
-    return INode.newINode(inodeId, permissions, blocks, symlink, replication,
-        modificationTime, atime, nsQuota, dsQuota, blockSize);
+    throw new IOException("Unknown inode type: numBlocks=" + numBlocks);
   }
 
-    private void loadFilesUnderConstruction(DataInputStream in)
-    throws IOException {
+    private void loadFilesUnderConstruction(DataInput in,
+        boolean supportSnapshot) throws IOException {
       FSDirectory fsDir = namesystem.dir;
       int size = in.readInt();
 
@@ -463,13 +703,22 @@ class FSImageFormat {
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
-        INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path);
-        fsDir.replaceNode(path, oldnode, cons);
+        final INodesInPath iip = fsDir.getLastINodeInPath(path);
+        INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path);
+        cons.setLocalName(oldnode.getLocalNameBytes());
+        cons.setParent(oldnode.getParent());
+
+        if (oldnode instanceof INodeFileWithSnapshot) {
+          cons = new INodeFileUnderConstructionWithSnapshot(cons,
+              ((INodeFileWithSnapshot)oldnode).getDiffs());
+        }
+
+        fsDir.replaceINodeFile(path, oldnode, cons);
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }
 
-    private void loadSecretManagerState(DataInputStream in)
+    private void loadSecretManagerState(DataInput in)
         throws IOException {
       int imgVersion = getLayoutVersion();
 
@@ -517,6 +766,10 @@ class FSImageFormat {
       }
       return result;
     }
+    
+    public Snapshot getSnapshot(DataInput in) throws IOException {
+      return snapshotMap.get(in.readInt());
+    }
   }
   
   /**
@@ -531,8 +784,7 @@ class FSImageFormat {
     
     /** The MD5 checksum of the file that was written */
     private MD5Hash savedDigest;
-
-    static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
+    private final ReferenceMap referenceMap = new ReferenceMap();
 
     /** @throws IllegalStateException if the instance has not yet saved an image */
     private void checkSaved() {
@@ -561,9 +813,7 @@ class FSImageFormat {
       return savedDigest;
     }
 
-    void save(File newFile,
-              FSImageCompression compression)
-      throws IOException {
+    void save(File newFile, FSImageCompression compression) throws IOException {
       checkNotSaved();
 
       final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
@@ -590,23 +840,22 @@ class FSImageFormat {
         out.writeLong(context.getTxId());
         out.writeLong(sourceNamesystem.getLastInodeId());
         
+        sourceNamesystem.getSnapshotManager().write(out);
+        
         // write compression info and set up compressed stream
         out = compression.writeHeaderAndWrapStream(fos);
         LOG.info("Saving image file " + newFile +
                  " using " + compression);
 
-
-        byte[] byteStore = new byte[4*HdfsConstants.MAX_PATH_LENGTH];
-        ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
         // save the root
-        FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
+        FSImageSerialization.saveINode2Image(fsDir.rootDir, out, false,
+            referenceMap);
         // save the rest of the nodes
-        saveImage(strbuf, fsDir.rootDir, out);
+        saveImage(fsDir.rootDir, out, true);
         // save files under construction
         sourceNamesystem.saveFilesUnderConstruction(out);
         context.checkCancelled();
         sourceNamesystem.saveSecretManagerState(out);
-        strbuf = null;
         context.checkCancelled();
         out.flush();
         context.checkCancelled();
@@ -624,40 +873,97 @@ class FSImageFormat {
     }
 
     /**
-     * Save file tree image starting from the given root.
-     * This is a recursive procedure, which first saves all children of
-     * a current directory and then moves inside the sub-directories.
+     * Save children INodes.
+     * @param children The list of children INodes
+     * @param out The DataOutputStream to write
+     * @return Number of children that are directory
      */
-    private void saveImage(ByteBuffer currentDirName,
-                                  INodeDirectory current,
-                                  DataOutputStream out) throws IOException {
-      final List<INode> children = current.getChildrenList();
-      if (children.isEmpty())
-        return;
-      // print prefix (parent directory name)
-      int prefixLen = currentDirName.position();
-      if (prefixLen == 0) {  // root
-        out.writeShort(PATH_SEPARATOR.length);
-        out.write(PATH_SEPARATOR);
-      } else {  // non-root directories
-        out.writeShort(prefixLen);
-        out.write(currentDirName.array(), 0, prefixLen);
-      }
+    private int saveChildren(ReadOnlyList<INode> children, DataOutputStream out)
+        throws IOException {
+      // Write normal children INode. 
       out.writeInt(children.size());
+      int dirNum = 0;
       int i = 0;
       for(INode child : children) {
         // print all children first
-        FSImageSerialization.saveINode2Image(child, out);
+        FSImageSerialization.saveINode2Image(child, out, false, referenceMap);
+        if (child.isDirectory()) {
+          dirNum++;
+        }
         if (i++ % 50 == 0) {
           context.checkCancelled();
         }
       }
+      return dirNum;
+    }
+    
+    /**
+     * Save file tree image starting from the given root.
+     * This is a recursive procedure, which first saves all children and 
+     * snapshot diffs of a current directory and then moves inside the 
+     * sub-directories.
+     * 
+     * @param current The current node
+     * @param out The DataoutputStream to write the image
+     * @param snapshot The possible snapshot associated with the current node
+     * @param toSaveSubtree Whether or not to save the subtree to fsimage. For
+     *                      reference node, its subtree may already have been
+     *                      saved before.
+     */
+    private void saveImage(INodeDirectory current, DataOutputStream out,
+        boolean toSaveSubtree) throws IOException {
+      // write the inode id of the directory
+      out.writeLong(current.getId());
+      
+      if (!toSaveSubtree) {
+        return;
+      }
+      
+      final ReadOnlyList<INode> children = current.getChildrenList(null);
+      int dirNum = 0;
+      List<INodeDirectory> snapshotDirs = null;
+      if (current instanceof INodeDirectoryWithSnapshot) {
+        snapshotDirs = new ArrayList<INodeDirectory>();
+        ((INodeDirectoryWithSnapshot) current).getSnapshotDirectory(
+            snapshotDirs);
+        dirNum += snapshotDirs.size();
+      }
+      
+      // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
+      // Snapshots
+      if (current instanceof INodeDirectorySnapshottable) {
+        INodeDirectorySnapshottable snapshottableNode = 
+            (INodeDirectorySnapshottable) current;
+        SnapshotFSImageFormat.saveSnapshots(snapshottableNode, out);
+      } else {
+        out.writeInt(-1); // # of snapshots
+      }
+
+      // 3. Write children INode 
+      dirNum += saveChildren(children, out);
+      
+      // 4. Write DirectoryDiff lists, if there is any.
+      SnapshotFSImageFormat.saveDirectoryDiffList(current, out, referenceMap);
+      
+      // Write sub-tree of sub-directories, including possible snapshots of 
+      // deleted sub-directories
+      out.writeInt(dirNum); // the number of sub-directories
       for(INode child : children) {
-        if(!child.isDirectory())
+        if(!child.isDirectory()) {
           continue;
-        currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        saveImage(currentDirName, (INodeDirectory)child, out);
-        currentDirName.position(prefixLen);
+        }
+        // make sure we only save the subtree under a reference node once
+        boolean toSave = child.isReference() ? 
+            referenceMap.toProcessSubtree(child.getId()) : true;
+        saveImage(child.asDirectory(), out, toSave);
+      }
+      if (snapshotDirs != null) {
+        for (INodeDirectory subDir : snapshotDirs) {
+          // make sure we only save the subtree under a reference node once
+          boolean toSave = subDir.getParentReference() != null ? 
+              referenceMap.toProcessSubtree(subDir.getId()) : true;
+          saveImage(subDir, out, toSave);
+        }
       }
     }
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1480838&r1=1480837&r2=1480838&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Thu May  9 23:55:49 2013
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.DataInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
@@ -34,11 +35,17 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Static utility functions for serializing various pieces of data in the correct
  * format for the FSImage file.
@@ -77,11 +84,30 @@ public class FSImageSerialization {
     final FsPermission FILE_PERM = new FsPermission((short) 0);
   }
 
+  private static void writePermissionStatus(INodeWithAdditionalFields inode,
+      DataOutput out) throws IOException {
+    final FsPermission p = TL_DATA.get().FILE_PERM;
+    p.fromShort(inode.getFsPermissionShort());
+    PermissionStatus.write(out, inode.getUserName(), inode.getGroupName(), p);
+  }
+
+  private static void writeBlocks(final Block[] blocks,
+      final DataOutput out) throws IOException {
+    if (blocks == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(blocks.length);
+      for (Block blk : blocks) {
+        blk.write(out);
+      }
+    }
+  }
+
   // Helper function that reads in an INodeUnderConstruction
   // from the input stream
   //
   static INodeFileUnderConstruction readINodeUnderConstruction(
-      DataInputStream in, FSNamesystem fsNamesys, int imgVersion)
+      DataInput in, FSNamesystem fsNamesys, int imgVersion)
       throws IOException {
     byte[] name = readBytes(in);
     long inodeId = LayoutVersion.supports(Feature.ADD_INODE_ID, imgVersion) ? in
@@ -89,6 +115,7 @@ public class FSImageSerialization {
     short blockReplication = in.readShort();
     long modificationTime = in.readLong();
     long preferredBlockSize = in.readLong();
+  
     int numBlocks = in.readInt();
     BlockInfo[] blocks = new BlockInfo[numBlocks];
     Block blk = new Block();
@@ -133,68 +160,141 @@ public class FSImageSerialization {
                                            throws IOException {
     writeString(path, out);
     out.writeLong(cons.getId());
-    out.writeShort(cons.getBlockReplication());
+    out.writeShort(cons.getFileReplication());
     out.writeLong(cons.getModificationTime());
     out.writeLong(cons.getPreferredBlockSize());
-    int nrBlocks = cons.getBlocks().length;
-    out.writeInt(nrBlocks);
-    for (int i = 0; i < nrBlocks; i++) {
-      cons.getBlocks()[i].write(out);
-    }
+
+    writeBlocks(cons.getBlocks(), out);
     cons.getPermissionStatus().write(out);
+
     writeString(cons.getClientName(), out);
     writeString(cons.getClientMachine(), out);
 
     out.writeInt(0); //  do not store locations of last block
   }
 
-  /*
-   * Save one inode's attributes to the image.
+  /**
+   * Serialize a {@link INodeFile} node
+   * @param node The node to write
+   * @param out The {@link DataOutputStream} where the fields are written
+   * @param writeBlock Whether to write block information
    */
-  static void saveINode2Image(INode node,
-                              DataOutputStream out) throws IOException {
-    byte[] name = node.getLocalNameBytes();
-    out.writeShort(name.length);
-    out.write(name);
+  public static void writeINodeFile(INodeFile file, DataOutput out,
+      boolean writeUnderConstruction) throws IOException {
+    writeLocalName(file, out);
+    out.writeLong(file.getId());
+    out.writeShort(file.getFileReplication());
+    out.writeLong(file.getModificationTime());
+    out.writeLong(file.getAccessTime());
+    out.writeLong(file.getPreferredBlockSize());
+
+    writeBlocks(file.getBlocks(), out);
+    SnapshotFSImageFormat.saveFileDiffList(file, out);
+
+    if (writeUnderConstruction) {
+      if (file instanceof INodeFileUnderConstruction) {
+        out.writeBoolean(true);
+        final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file;
+        writeString(uc.getClientName(), out);
+        writeString(uc.getClientMachine(), out);
+      } else {
+        out.writeBoolean(false);
+      }
+    }
+
+    writePermissionStatus(file, out);
+  }
+
+  /**
+   * Serialize a {@link INodeDirectory}
+   * @param node The node to write
+   * @param out The {@link DataOutput} where the fields are written 
+   */
+  public static void writeINodeDirectory(INodeDirectory node, DataOutput out)
+      throws IOException {
+    writeLocalName(node, out);
     out.writeLong(node.getId());
-    FsPermission filePerm = TL_DATA.get().FILE_PERM;
-    if (node.isDirectory()) {
-      out.writeShort(0);  // replication
-      out.writeLong(node.getModificationTime());
-      out.writeLong(0);   // access time
-      out.writeLong(0);   // preferred block size
-      out.writeInt(-1);   // # of blocks
-      out.writeLong(node.getNsQuota());
-      out.writeLong(node.getDsQuota());
-      filePerm.fromShort(node.getFsPermissionShort());
-      PermissionStatus.write(out, node.getUserName(),
-                             node.getGroupName(),
-                             filePerm);
-    } else if (node.isSymlink()) {
-      out.writeShort(0);  // replication
-      out.writeLong(0);   // modification time
-      out.writeLong(0);   // access time
-      out.writeLong(0);   // preferred block size
-      out.writeInt(-2);   // # of blocks
-      Text.writeString(out, ((INodeSymlink)node).getSymlinkString());
-      filePerm.fromShort(node.getFsPermissionShort());
-      PermissionStatus.write(out, node.getUserName(),
-                             node.getGroupName(),
-                             filePerm);      
+    out.writeShort(0);  // replication
+    out.writeLong(node.getModificationTime());
+    out.writeLong(0);   // access time
+    out.writeLong(0);   // preferred block size
+    out.writeInt(-1);   // # of blocks
+
+    out.writeLong(node.getNsQuota());
+    out.writeLong(node.getDsQuota());
+    if (node instanceof INodeDirectorySnapshottable) {
+      out.writeBoolean(true);
     } else {
-      INodeFile fileINode = (INodeFile)node;
-      out.writeShort(fileINode.getBlockReplication());
-      out.writeLong(fileINode.getModificationTime());
-      out.writeLong(fileINode.getAccessTime());
-      out.writeLong(fileINode.getPreferredBlockSize());
-      Block[] blocks = fileINode.getBlocks();
-      out.writeInt(blocks.length);
-      for (Block blk : blocks)
-        blk.write(out);
-      filePerm.fromShort(fileINode.getFsPermissionShort());
-      PermissionStatus.write(out, fileINode.getUserName(),
-                             fileINode.getGroupName(),
-                             filePerm);
+      out.writeBoolean(false);
+      out.writeBoolean(node instanceof INodeDirectoryWithSnapshot);
+    }
+    
+    writePermissionStatus(node, out);
+  }
+  
+  /**
+   * Serialize a {@link INodeSymlink} node
+   * @param node The node to write
+   * @param out The {@link DataOutput} where the fields are written
+   */
+  private static void writeINodeSymlink(INodeSymlink node, DataOutput out)
+      throws IOException {
+    writeLocalName(node, out);
+    out.writeLong(node.getId());
+    out.writeShort(0);  // replication
+    out.writeLong(0);   // modification time
+    out.writeLong(0);   // access time
+    out.writeLong(0);   // preferred block size
+    out.writeInt(-2);   // # of blocks
+
+    Text.writeString(out, node.getSymlinkString());
+    writePermissionStatus(node, out);
+  }
+  
+  /** Serialize a {@link INodeReference} node */
+  private static void writeINodeReference(INodeReference ref, DataOutput out,
+      boolean writeUnderConstruction, ReferenceMap referenceMap
+      ) throws IOException {
+    writeLocalName(ref, out);
+    out.writeLong(ref.getId());
+    out.writeShort(0);  // replication
+    out.writeLong(0);   // modification time
+    out.writeLong(0);   // access time
+    out.writeLong(0);   // preferred block size
+    out.writeInt(-3);   // # of blocks
+
+    final boolean isWithName = ref instanceof INodeReference.WithName;
+    out.writeBoolean(isWithName);
+    
+    if (!isWithName) {
+      Preconditions.checkState(ref instanceof INodeReference.DstReference);
+      // dst snapshot id
+      out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId());
+    } else {
+      out.writeInt(((INodeReference.WithName) ref).getLastSnapshotId());
+    }
+    
+    final INodeReference.WithCount withCount
+        = (INodeReference.WithCount)ref.getReferredINode();
+    referenceMap.writeINodeReferenceWithCount(withCount, out,
+        writeUnderConstruction);
+  }
+
+  /**
+   * Save one inode's attributes to the image.
+   */
+  public static void saveINode2Image(INode node, DataOutput out,
+      boolean writeUnderConstruction, ReferenceMap referenceMap)
+      throws IOException {
+    if (node.isReference()) {
+      writeINodeReference(node.asReference(), out, writeUnderConstruction,
+          referenceMap);
+    } else if (node.isDirectory()) {
+      writeINodeDirectory(node.asDirectory(), out);
+    } else if (node.isSymlink()) {
+      writeINodeSymlink(node.asSymlink(), out);      
+    } else if (node.isFile()) {
+      writeINodeFile(node.asFile(), out, writeUnderConstruction);
     }
   }
 
@@ -202,19 +302,19 @@ public class FSImageSerialization {
   // code is moved into this package. This method should not be called
   // by other code.
   @SuppressWarnings("deprecation")
-  public static String readString(DataInputStream in) throws IOException {
+  public static String readString(DataInput in) throws IOException {
     DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
     ustr.readFields(in);
     return ustr.toStringChecked();
   }
 
-  static String readString_EmptyAsNull(DataInputStream in) throws IOException {
+  static String readString_EmptyAsNull(DataInput in) throws IOException {
     final String s = readString(in);
     return s.isEmpty()? null: s;
   }
 
   @SuppressWarnings("deprecation")
-  static void writeString(String str, DataOutputStream out) throws IOException {
+  public static void writeString(String str, DataOutput out) throws IOException {
     DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
     ustr.set(str);
     ustr.write(out);
@@ -222,7 +322,7 @@ public class FSImageSerialization {
 
   
   /** read the long value */
-  static long readLong(DataInputStream in) throws IOException {
+  static long readLong(DataInput in) throws IOException {
     LongWritable ustr = TL_DATA.get().U_LONG;
     ustr.readFields(in);
     return ustr.get();
@@ -236,7 +336,7 @@ public class FSImageSerialization {
   }
 
   /** read short value */
-  static short readShort(DataInputStream in) throws IOException {
+  static short readShort(DataInput in) throws IOException {
     ShortWritable uShort = TL_DATA.get().U_SHORT;
     uShort.readFields(in);
     return uShort.get();
@@ -251,7 +351,7 @@ public class FSImageSerialization {
   
   // Same comments apply for this method as for readString()
   @SuppressWarnings("deprecation")
-  public static byte[] readBytes(DataInputStream in) throws IOException {
+  public static byte[] readBytes(DataInput in) throws IOException {
     DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
     ustr.readFields(in);
     int len = ustr.getLength();
@@ -269,7 +369,7 @@ public class FSImageSerialization {
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  public static byte[][] readPathComponents(DataInputStream in)
+  public static byte[][] readPathComponents(DataInput in)
       throws IOException {
     DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
     
@@ -277,7 +377,19 @@ public class FSImageSerialization {
     return DFSUtil.bytes2byteArray(ustr.getBytes(),
       ustr.getLength(), (byte) Path.SEPARATOR_CHAR);
   }
+  
+  public static byte[] readLocalName(DataInput in) throws IOException {
+    byte[] createdNodeName = new byte[in.readShort()];
+    in.readFully(createdNodeName);
+    return createdNodeName;
+  }
 
+  private static void writeLocalName(INode inode, DataOutput out)
+      throws IOException {
+    final byte[] name = inode.getLocalNameBytes();
+    out.writeShort(name.length);
+    out.write(name);
+  }
 
   /**
    * Write an array of blocks as compactly as possible. This uses
@@ -302,7 +414,7 @@ public class FSImageSerialization {
   }
   
   public static Block[] readCompactBlockArray(
-      DataInputStream in, int logVersion) throws IOException {
+      DataInput in, int logVersion) throws IOException {
     int num = WritableUtils.readVInt(in);
     if (num < 0) {
       throw new IOException("Invalid block array length: " + num);
@@ -320,4 +432,4 @@ public class FSImageSerialization {
     }
     return ret;
   }
-}
\ No newline at end of file
+}



Mime
View raw message