hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1568548 [1/3] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/security/token/delegation/ src/main/java...
Date Sat, 15 Feb 2014 00:08:05 GMT
Author: arp
Date: Sat Feb 15 00:08:04 2014
New Revision: 1568548

URL: http://svn.apache.org/r1568548
Log:
HDFS-5698. Merge r1566359 from trunk for merging the 'Use protobuf to serialize / deserialize FSImage' feature

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Feb 15 00:08:04 2014
@@ -69,6 +69,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5775. Consolidate the code for serialization in CacheManager
     (Haohui Mai via brandonli)
 
+    HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager
+    (Haohui Mai via brandonli)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
@@ -152,6 +155,55 @@ Release 2.4.0 - UNRELEASED
     HDFS-5949. New Namenode UI when trying to download a file, the browser
     doesn't know the file name. (Haohui Mai via brandonli)
 
+  BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
+
+    HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5738. Serialize INode information in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5772. Serialize under-construction file information in FSImage. (jing9)
+
+    HDFS-5783. Compute the digest before loading FSImage. (Haohui Mai via jing9)
+
+    HDFS-5785. Serialize symlink in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5793. Optimize the serialization of PermissionStatus. (Haohui Mai via
+    jing9)
+
+    HDFS-5743. Use protobuf to serialize snapshot information. (jing9)
+
+    HDFS-5774. Serialize CachePool directives in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5744. Serialize information for token managers in protobuf. (Haohui Mai
+    via jing9)
+
+    HDFS-5824. Add a Type field in Snapshot DiffEntry's protobuf definition.
+    (jing9)
+
+    HDFS-5808. Implement cancellation when saving FSImage. (Haohui Mai via jing9)
+
+    HDFS-5826. Update the stored edit logs to be consistent with the changes in
+    HDFS-5698 branch. (Haohui Mai via jing9)
+
+    HDFS-5797. Implement offline image viewer. (Haohui Mai via jing9)
+
+    HDFS-5771. Track progress when loading fsimage. (Haohui Mai via cnauroth)
+
+    HDFS-5871. Use PBHelper to serialize CacheDirectiveInfoExpirationProto.
+    (Haohui Mai via jing9)
+
+    HDFS-5884. LoadDelegator should use IOUtils.readFully() to read the magic
+    header. (Haohui Mai via jing9)
+
+    HDFS-5885. Add annotation for repeated fields in the protobuf definition.
+    (Haohui Mai via jing9)
+
+    HDFS-5906. Fixing findbugs and javadoc warnings in the HDFS-5698 branch.
+    (Haohui Mai via jing9)
+
+    HDFS-5911. The id of a CacheDirective instance does not get serialized in
+    the protobuf-fsimage. (Haohui Mai via jing9)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Sat Feb 15 00:08:04 2014
@@ -9,6 +9,9 @@
        <Package name="org.apache.hadoop.hdfs.server.namenode.ha.proto" />
      </Match>
      <Match>
+       <Class name="~org.apache.hadoop.hdfs.server.namenode.FsImageProto.*" />
+     </Match>
+     <Match>
        <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
      </Match>
      <Match>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml Sat Feb 15 00:08:04 2014
@@ -453,6 +453,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
                 <includes>
                   <include>ClientDatanodeProtocol.proto</include>
                   <include>DatanodeProtocol.proto</include>
+                  <include>fsimage.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Sat Feb 15 00:08:04 2014
@@ -139,7 +139,7 @@ elif [ "$COMMAND" = "balancer" ] ; then
 elif [ "$COMMAND" = "jmxget" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.JMXGet
 elif [ "$COMMAND" = "oiv" ] ; then
-  CLASS=org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer
+  CLASS=org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewerPB
 elif [ "$COMMAND" = "oev" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer
 elif [ "$COMMAND" = "fetchdt" ] ; then

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Sat Feb 15 00:08:04 2014
@@ -112,7 +112,8 @@ public class LayoutVersion {
     ADD_DATANODE_AND_STORAGE_UUIDS(-49, "Replace StorageID with DatanodeUuid."
         + " Use distinct StorageUuid per storage directory."),
     ADD_LAYOUT_FLAGS(-50, "Add support for layout flags."),
-    CACHING(-51, "Support for cache pools and path-based caching");
+    CACHING(-51, "Support for cache pools and path-based caching"),
+    PROTOBUF_FORMAT(-52, "Use protobuf to serialize FSImage");
 
     final int lv;
     final int ancestorLV;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java Sat Feb 15 00:08:04 2014
@@ -23,12 +23,16 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -46,6 +50,10 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
 /**
  * A HDFS specific delegation token secret manager.
  * The secret manager is responsible for generating and accepting the password
@@ -167,7 +175,45 @@ public class DelegationTokenSecretManage
     }
     serializerCompat.load(in);
   }
-  
+
+  public static class SecretManagerState {
+    public final SecretManagerSection section;
+    public final List<SecretManagerSection.DelegationKey> keys;
+    public final List<SecretManagerSection.PersistToken> tokens;
+
+    public SecretManagerState(
+        SecretManagerSection s,
+        List<SecretManagerSection.DelegationKey> keys,
+        List<SecretManagerSection.PersistToken> tokens) {
+      this.section = s;
+      this.keys = keys;
+      this.tokens = tokens;
+    }
+  }
+
+  public synchronized void loadSecretManagerState(SecretManagerState state)
+      throws IOException {
+    Preconditions.checkState(!running,
+        "Can't load state from image in a running SecretManager.");
+
+    currentId = state.section.getCurrentId();
+    delegationTokenSequenceNumber = state.section.getTokenSequenceNumber();
+    for (SecretManagerSection.DelegationKey k : state.keys) {
+      addKey(new DelegationKey(k.getId(), k.getExpiryDate(), k.hasKey() ? k
+          .getKey().toByteArray() : null));
+    }
+
+    for (SecretManagerSection.PersistToken t : state.tokens) {
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier(new Text(
+          t.getOwner()), new Text(t.getRenewer()), new Text(t.getRealUser()));
+      id.setIssueDate(t.getIssueDate());
+      id.setMaxDate(t.getMaxDate());
+      id.setSequenceNumber(t.getSequenceNumber());
+      id.setMasterKeyId(t.getMasterKeyId());
+      addPersistedDelegationToken(id, t.getExpiryDate());
+    }
+  }
+
   /**
    * Store the current state of the SecretManager for persistence
    * 
@@ -179,7 +225,43 @@ public class DelegationTokenSecretManage
       String sdPath) throws IOException {
     serializerCompat.save(out, sdPath);
   }
-  
+
+  public synchronized SecretManagerState saveSecretManagerState() {
+    SecretManagerSection s = SecretManagerSection.newBuilder()
+        .setCurrentId(currentId)
+        .setTokenSequenceNumber(delegationTokenSequenceNumber)
+        .setNumKeys(allKeys.size()).setNumTokens(currentTokens.size()).build();
+    ArrayList<SecretManagerSection.DelegationKey> keys = Lists
+        .newArrayListWithCapacity(allKeys.size());
+    ArrayList<SecretManagerSection.PersistToken> tokens = Lists
+        .newArrayListWithCapacity(currentTokens.size());
+
+    for (DelegationKey v : allKeys.values()) {
+      SecretManagerSection.DelegationKey.Builder b = SecretManagerSection.DelegationKey
+          .newBuilder().setId(v.getKeyId()).setExpiryDate(v.getExpiryDate());
+      if (v.getEncodedKey() != null) {
+        b.setKey(ByteString.copyFrom(v.getEncodedKey()));
+      }
+      keys.add(b.build());
+    }
+
+    for (Entry<DelegationTokenIdentifier, DelegationTokenInformation> e : currentTokens
+        .entrySet()) {
+      DelegationTokenIdentifier id = e.getKey();
+      SecretManagerSection.PersistToken.Builder b = SecretManagerSection.PersistToken
+          .newBuilder().setOwner(id.getOwner().toString())
+          .setRenewer(id.getRenewer().toString())
+          .setRealUser(id.getRealUser().toString())
+          .setIssueDate(id.getIssueDate()).setMaxDate(id.getMaxDate())
+          .setSequenceNumber(id.getSequenceNumber())
+          .setMasterKeyId(id.getMasterKeyId())
+          .setExpiryDate(e.getValue().getRenewDate());
+      tokens.add(b.build());
+    }
+
+    return new SecretManagerState(s, keys, tokens);
+  }
+
   /**
    * This method is intended to be used only while reading edit logs.
    * 
@@ -431,4 +513,5 @@ public class DelegationTokenSecretManage
       prog.endStep(Phase.LOADING_FSIMAGE, step);
     }
   }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Sat Feb 15 00:08:04 2014
@@ -50,8 +50,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -62,11 +64,15 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -81,6 +87,7 @@ import org.apache.hadoop.util.LightWeigh
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 /**
  * The Cache Manager handles caching on DataNodes.
@@ -167,6 +174,19 @@ public final class CacheManager {
    */
   private CacheReplicationMonitor monitor;
 
+  public static final class PersistState {
+    public final CacheManagerSection section;
+    public final List<CachePoolInfoProto> pools;
+    public final List<CacheDirectiveInfoProto> directives;
+
+    public PersistState(CacheManagerSection section,
+        List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) {
+      this.section = section;
+      this.pools = pools;
+      this.directives = directives;
+    }
+  }
+
   CacheManager(FSNamesystem namesystem, Configuration conf,
       BlockManager blockManager) {
     this.namesystem = namesystem;
@@ -944,6 +964,64 @@ public final class CacheManager {
     serializerCompat.save(out, sdPath);
   }
 
+  public PersistState saveState() throws IOException {
+    ArrayList<CachePoolInfoProto> pools = Lists
+        .newArrayListWithCapacity(cachePools.size());
+    ArrayList<CacheDirectiveInfoProto> directives = Lists
+        .newArrayListWithCapacity(directivesById.size());
+
+    for (CachePool pool : cachePools.values()) {
+      CachePoolInfo p = pool.getInfo(true);
+      CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder()
+          .setPoolName(p.getPoolName());
+
+      if (p.getOwnerName() != null)
+        b.setOwnerName(p.getOwnerName());
+
+      if (p.getGroupName() != null)
+        b.setGroupName(p.getGroupName());
+
+      if (p.getMode() != null)
+        b.setMode(p.getMode().toShort());
+
+      if (p.getLimit() != null)
+        b.setLimit(p.getLimit());
+
+      pools.add(b.build());
+    }
+
+    for (CacheDirective directive : directivesById.values()) {
+      CacheDirectiveInfo info = directive.toInfo();
+      CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder()
+          .setId(info.getId());
+
+      if (info.getPath() != null) {
+        b.setPath(info.getPath().toUri().getPath());
+      }
+
+      if (info.getReplication() != null) {
+        b.setReplication(info.getReplication());
+      }
+
+      if (info.getPool() != null) {
+        b.setPool(info.getPool());
+      }
+
+      Expiration expiry = info.getExpiration();
+      if (expiry != null) {
+        assert (!expiry.isRelative());
+        b.setExpiration(PBHelper.convert(expiry));
+      }
+
+      directives.add(b.build());
+    }
+    CacheManagerSection s = CacheManagerSection.newBuilder()
+        .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size())
+        .setNumDirectives(directives.size()).build();
+
+    return new PersistState(s, pools, directives);
+  }
+
   /**
    * Reloads CacheManager state from the passed DataInput. Used during namenode
    * startup to restore CacheManager state from an FSImage.
@@ -954,6 +1032,56 @@ public final class CacheManager {
     serializerCompat.load(in);
   }
 
+  public void loadState(PersistState s) throws IOException {
+    nextDirectiveId = s.section.getNextDirectiveId();
+    for (CachePoolInfoProto p : s.pools) {
+      CachePoolInfo info = new CachePoolInfo(p.getPoolName());
+      if (p.hasOwnerName())
+        info.setOwnerName(p.getOwnerName());
+
+      if (p.hasGroupName())
+        info.setGroupName(p.getGroupName());
+
+      if (p.hasMode())
+        info.setMode(new FsPermission((short) p.getMode()));
+
+      if (p.hasLimit())
+        info.setLimit(p.getLimit());
+
+      addCachePool(info);
+    }
+
+    for (CacheDirectiveInfoProto p : s.directives) {
+      // Get pool reference by looking it up in the map
+      final String poolName = p.getPool();
+      CacheDirective directive = new CacheDirective(p.getId(), new Path(
+          p.getPath()).toUri().getPath(), (short) p.getReplication(), p
+          .getExpiration().getMillis());
+      addCacheDirective(poolName, directive);
+    }
+  }
+
+  private void addCacheDirective(final String poolName,
+      final CacheDirective directive) throws IOException {
+    CachePool pool = cachePools.get(poolName);
+    if (pool == null) {
+      throw new IOException("Directive refers to pool " + poolName
+          + ", which does not exist.");
+    }
+    boolean addedDirective = pool.getDirectiveList().add(directive);
+    assert addedDirective;
+    if (directivesById.put(directive.getId(), directive) != null) {
+      throw new IOException("A directive with ID " + directive.getId()
+          + " already exists");
+    }
+    List<CacheDirective> directives = directivesByPath.get(directive.getPath());
+    if (directives == null) {
+      directives = new LinkedList<CacheDirective>();
+      directivesByPath.put(directive.getPath(), directives);
+    }
+    directives.add(directive);
+  }
+
   private final class SerializerCompat {
     private void save(DataOutputStream out, String sdPath) throws IOException {
       out.writeLong(nextDirectiveId);
@@ -1036,27 +1164,10 @@ public final class CacheManager {
         CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
         // Get pool reference by looking it up in the map
         final String poolName = info.getPool();
-        CachePool pool = cachePools.get(poolName);
-        if (pool == null) {
-          throw new IOException("Directive refers to pool " + poolName +
-              ", which does not exist.");
-        }
         CacheDirective directive =
             new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
                 info.getReplication(), info.getExpiration().getAbsoluteMillis());
-        boolean addedDirective = pool.getDirectiveList().add(directive);
-        assert addedDirective;
-        if (directivesById.put(directive.getId(), directive) != null) {
-          throw new IOException("A directive with ID " + directive.getId() +
-              " already exists");
-        }
-        List<CacheDirective> directives =
-            directivesByPath.get(directive.getPath());
-        if (directives == null) {
-          directives = new LinkedList<CacheDirective>();
-          directivesByPath.put(directive.getPath(), directives);
-        }
-        directives.add(directive);
+        addCacheDirective(poolName, directive);
         counter.increment();
       }
       prog.endStep(Phase.LOADING_FSIMAGE, step);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Feb 15 00:08:04 2014
@@ -827,8 +827,7 @@ public class FSImage implements Closeabl
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
       FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
-    FSImageFormat.Loader loader = new FSImageFormat.Loader(
-        conf, target);
+    FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
     loader.load(curFile);
     target.setBlockPoolId(this.getBlockPoolID());
 
@@ -857,7 +856,7 @@ public class FSImage implements Closeabl
     File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
     File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
     
-    FSImageFormat.Saver saver = new FSImageFormat.Saver(context);
+    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
     saver.save(newFile, compression);
     

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java Sat Feb 15 00:08:04 2014
@@ -57,6 +57,10 @@ class FSImageCompression {
     imageCodec = codec;
   }
 
+  public CompressionCodec getImageCodec() {
+    return imageCodec;
+  }
+
   /**
    * Create a "noop" compression - i.e. uncompressed
    */
@@ -89,7 +93,7 @@ class FSImageCompression {
    * Create a compression instance using the codec specified by
    * <code>codecClassName</code>
    */
-  private static FSImageCompression createCompression(Configuration conf,
+  static FSImageCompression createCompression(Configuration conf,
                                                       String codecClassName)
     throws IOException {
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Sat Feb 15 00:08:04 2014
@@ -69,12 +69,13 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Contains inner classes for reading or writing the on-disk format for
@@ -181,16 +182,74 @@ import com.google.common.base.Preconditi
 @InterfaceStability.Evolving
 public class FSImageFormat {
   private static final Log LOG = FSImage.LOG;
-  
+
   // Static-only class
   private FSImageFormat() {}
-  
+
+  interface AbstractLoader {
+    MD5Hash getLoadedImageMd5();
+    long getLoadedImageTxId();
+  }
+
+  static class LoaderDelegator implements AbstractLoader {
+    private AbstractLoader impl;
+    private final Configuration conf;
+    private final FSNamesystem fsn;
+
+    LoaderDelegator(Configuration conf, FSNamesystem fsn) {
+      this.conf = conf;
+      this.fsn = fsn;
+    }
+
+    @Override
+    public MD5Hash getLoadedImageMd5() {
+      return impl.getLoadedImageMd5();
+    }
+
+    @Override
+    public long getLoadedImageTxId() {
+      return impl.getLoadedImageTxId();
+    }
+
+    public void load(File file) throws IOException {
+      Preconditions.checkState(impl == null, "Image already loaded!");
+
+      FileInputStream is = null;
+      try {
+        is = new FileInputStream(file);
+        byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
+        IOUtils.readFully(is, magic, 0, magic.length);
+        if (Arrays.equals(magic, FSImageUtil.MAGIC_HEADER)) {
+          FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
+              conf, fsn);
+          impl = loader;
+          loader.load(file);
+        } else {
+          Loader loader = new Loader(conf, fsn);
+          impl = loader;
+          loader.load(file);
+        }
+
+      } finally {
+        IOUtils.cleanup(LOG, is);
+      }
+    }
+  }
+
+  /**
+   * Construct a loader class to load the image. It chooses the loader based on
+   * the layout version.
+   */
+  public static LoaderDelegator newLoader(Configuration conf, FSNamesystem fsn) {
+    return new LoaderDelegator(conf, fsn);
+  }
+
   /**
    * A one-shot class responsible for loading an image. The load() function
    * should be called once, after which the getter methods may be used to retrieve
    * information about the image that was loaded, if loading was successful.
    */
-  public static class Loader {
+  public static class Loader implements AbstractLoader {
     private final Configuration conf;
     /** which namesystem this loader is working for */
     private final FSNamesystem namesystem;
@@ -215,12 +274,14 @@ public class FSImageFormat {
      * Return the MD5 checksum of the image that has been loaded.
      * @throws IllegalStateException if load() has not yet been called.
      */
-    MD5Hash getLoadedImageMd5() {
+    @Override
+    public MD5Hash getLoadedImageMd5() {
       checkLoaded();
       return imgDigest;
     }
 
-    long getLoadedImageTxId() {
+    @Override
+    public long getLoadedImageTxId() {
       checkLoaded();
       return imgTxId;
     }
@@ -243,7 +304,7 @@ public class FSImageFormat {
       }
     }
 
-    void load(File curFile) throws IOException {
+    public void load(File curFile) throws IOException {
       checkNotLoaded();
       assert curFile != null : "curFile is null";
 

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java?rev=1568548&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java Sat Feb 15 00:08:04 2014
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.StringMap;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+@InterfaceAudience.Private
+public final class FSImageFormatPBINode {
+  private final static long USER_GROUP_STRID_MASK = (1 << 24) - 1;
+  private final static int USER_STRID_OFFSET = 40;
+  private final static int GROUP_STRID_OFFSET = 16;
+  private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
+
+  public final static class Loader {
+    public static PermissionStatus loadPermission(long id,
+        final String[] stringTable) {
+      short perm = (short) (id & ((1 << GROUP_STRID_OFFSET) - 1));
+      int gsid = (int) ((id >> GROUP_STRID_OFFSET) & USER_GROUP_STRID_MASK);
+      int usid = (int) ((id >> USER_STRID_OFFSET) & USER_GROUP_STRID_MASK);
+      return new PermissionStatus(stringTable[usid], stringTable[gsid],
+          new FsPermission(perm));
+    }
+
+    public static INodeReference loadINodeReference(
+        INodeSection.INodeReference r, FSDirectory dir) throws IOException {
+      long referredId = r.getReferredId();
+      INode referred = dir.getInode(referredId);
+      WithCount withCount = (WithCount) referred.getParentReference();
+      if (withCount == null) {
+        withCount = new INodeReference.WithCount(null, referred);
+      }
+      final INodeReference ref;
+      if (r.hasDstSnapshotId()) { // DstReference
+        ref = new INodeReference.DstReference(null, withCount,
+            r.getDstSnapshotId());
+      } else {
+        ref = new INodeReference.WithName(null, withCount, r.getName()
+            .toByteArray(), r.getLastSnapshotId());
+      }
+      return ref;
+    }
+
+    public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
+        final String[] stringTable) {
+      assert n.getType() == INodeSection.INode.Type.DIRECTORY;
+      INodeSection.INodeDirectory d = n.getDirectory();
+
+      final PermissionStatus permissions = loadPermission(d.getPermission(),
+          stringTable);
+      final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
+          .toByteArray(), permissions, d.getModificationTime());
+
+      final long nsQuota = d.getNsQuota(), dsQuota = d.getDsQuota();
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+      }
+      return dir;
+    }
+
+    public static void updateBlocksMap(INodeFile file, BlockManager bm) {
+      // Add file->block mapping
+      final BlockInfo[] blocks = file.getBlocks();
+      if (blocks != null) {
+        for (int i = 0; i < blocks.length; i++) {
+          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+        }
+      }
+    }
+
+    private final FSDirectory dir;
+    private final FSNamesystem fsn;
+    private final FSImageFormatProtobuf.Loader parent;
+
+    Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
+      this.fsn = fsn;
+      this.dir = fsn.dir;
+      this.parent = parent;
+    }
+
+    void loadINodeDirectorySection(InputStream in) throws IOException {
+      while (true) {
+        INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
+            .parseDelimitedFrom(in);
+        // note that in is a LimitedInputStream
+        if (e == null) {
+          break;
+        }
+        INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
+        for (long id : e.getChildrenList()) {
+          INode child = dir.getInode(id);
+          addToParent(p, child);
+        }
+        for (int i = 0; i < e.getNumOfRef(); i++) {
+          INodeReference ref = loadINodeReference(in);
+          addToParent(p, ref);
+        }
+      }
+    }
+
+    private INodeReference loadINodeReference(InputStream in)
+        throws IOException {
+      INodeSection.INodeReference ref = INodeSection.INodeReference
+          .parseDelimitedFrom(in);
+      return loadINodeReference(ref, dir);
+    }
+
+    void loadINodeSection(InputStream in) throws IOException {
+      INodeSection s = INodeSection.parseDelimitedFrom(in);
+      fsn.resetLastInodeId(s.getLastInodeId());
+      LOG.info("Loading " + s.getNumInodes() + " INodes.");
+      for (int i = 0; i < s.getNumInodes(); ++i) {
+        INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+        if (p.getId() == INodeId.ROOT_INODE_ID) {
+          loadRootINode(p);
+        } else {
+          INode n = loadINode(p);
+          dir.addToInodeMap(n);
+        }
+      }
+    }
+
+    /**
+     * Load the under-construction files section, and update the lease map
+     */
+    void loadFilesUnderConstructionSection(InputStream in) throws IOException {
+      while (true) {
+        FileUnderConstructionEntry entry = FileUnderConstructionEntry
+            .parseDelimitedFrom(in);
+        if (entry == null) {
+          break;
+        }
+        // update the lease manager
+        INodeFile file = dir.getInode(entry.getInodeId()).asFile();
+        FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
+        Preconditions.checkState(uc != null); // file must be under-construction
+        fsn.leaseManager.addLease(uc.getClientName(), entry.getFullPath());
+      }
+    }
+
+    private void addToParent(INodeDirectory parent, INode child) {
+      if (parent == dir.rootDir && FSDirectory.isReservedName(child)) {
+        throw new HadoopIllegalArgumentException("File name \""
+            + child.getLocalName() + "\" is reserved. Please "
+            + " change the name of the existing file or directory to another "
+            + "name before upgrading to this release.");
+      }
+      // NOTE: This does not update space counts for parents
+      if (!parent.addChild(child)) {
+        return;
+      }
+      dir.cacheName(child);
+
+      if (child.isFile()) {
+        updateBlocksMap(child.asFile(), fsn.getBlockManager());
+      }
+    }
+
+    private INode loadINode(INodeSection.INode n) {
+      switch (n.getType()) {
+      case FILE:
+        return loadINodeFile(n);
+      case DIRECTORY:
+        return loadINodeDirectory(n, parent.getStringTable());
+      case SYMLINK:
+        return loadINodeSymlink(n);
+      default:
+        break;
+      }
+      return null;
+    }
+
+    private INodeFile loadINodeFile(INodeSection.INode n) {
+      assert n.getType() == INodeSection.INode.Type.FILE;
+      INodeSection.INodeFile f = n.getFile();
+      List<BlockProto> bp = f.getBlocksList();
+      short replication = (short) f.getReplication();
+
+      BlockInfo[] blocks = new BlockInfo[bp.size()];
+      for (int i = 0, e = bp.size(); i < e; ++i) {
+        blocks[i] = new BlockInfo(PBHelper.convert(bp.get(i)), replication);
+      }
+      final PermissionStatus permissions = loadPermission(f.getPermission(),
+          parent.getStringTable());
+
+      final INodeFile file = new INodeFile(n.getId(),
+          n.getName().toByteArray(), permissions, f.getModificationTime(),
+          f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+      // under-construction information
+      if (f.hasFileUC()) {
+        INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
+        file.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
+            null);
+        if (blocks.length > 0) {
+          BlockInfo lastBlk = file.getLastBlock();
+          // replace the last block of file
+          file.setBlock(file.numBlocks() - 1, new BlockInfoUnderConstruction(
+              lastBlk, replication));
+        }
+      }
+      return file;
+    }
+
+
+    private INodeSymlink loadINodeSymlink(INodeSection.INode n) {
+      assert n.getType() == INodeSection.INode.Type.SYMLINK;
+      INodeSection.INodeSymlink s = n.getSymlink();
+      final PermissionStatus permissions = loadPermission(s.getPermission(),
+          parent.getStringTable());
+      return new INodeSymlink(n.getId(), n.getName().toByteArray(), permissions,
+          0, 0, s.getTarget().toStringUtf8());
+    }
+
+    private void loadRootINode(INodeSection.INode p) {
+      INodeDirectory root = loadINodeDirectory(p, parent.getStringTable());
+      final Quota.Counts q = root.getQuotaCounts();
+      final long nsQuota = q.get(Quota.NAMESPACE);
+      final long dsQuota = q.get(Quota.DISKSPACE);
+      if (nsQuota != -1 || dsQuota != -1) {
+        dir.rootDir.getDirectoryWithQuotaFeature().setQuota(nsQuota, dsQuota);
+      }
+      dir.rootDir.cloneModificationTime(root);
+      dir.rootDir.clonePermissionStatus(root);
+    }
+  }
+
+  public final static class Saver {
+    private static long buildPermissionStatus(INodeAttributes n,
+        final StringMap stringMap) {
+      long userId = stringMap.getStringId(n.getUserName());
+      long groupId = stringMap.getStringId(n.getGroupName());
+      return ((userId & USER_GROUP_STRID_MASK) << USER_STRID_OFFSET)
+          | ((groupId & USER_GROUP_STRID_MASK) << GROUP_STRID_OFFSET)
+          | n.getFsPermissionShort();
+    }
+
+    public static INodeSection.INodeFile.Builder buildINodeFile(
+        INodeFileAttributes file, final StringMap stringMap) {
+      INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
+          .setAccessTime(file.getAccessTime())
+          .setModificationTime(file.getModificationTime())
+          .setPermission(buildPermissionStatus(file, stringMap))
+          .setPreferredBlockSize(file.getPreferredBlockSize())
+          .setReplication(file.getFileReplication());
+      return b;
+    }
+
+    public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
+        INodeDirectoryAttributes dir, final StringMap stringMap) {
+      Quota.Counts quota = dir.getQuotaCounts();
+      INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
+          .newBuilder().setModificationTime(dir.getModificationTime())
+          .setNsQuota(quota.get(Quota.NAMESPACE))
+          .setDsQuota(quota.get(Quota.DISKSPACE))
+          .setPermission(buildPermissionStatus(dir, stringMap));
+      return b;
+    }
+
+    public static INodeSection.INodeReference.Builder buildINodeReference(
+        INodeReference ref) throws IOException {
+      INodeSection.INodeReference.Builder rb = INodeSection.INodeReference
+          .newBuilder().setReferredId(ref.getId());
+      if (ref instanceof WithName) {
+        rb.setLastSnapshotId(((WithName) ref).getLastSnapshotId()).setName(
+            ByteString.copyFrom(ref.getLocalNameBytes()));
+      } else if (ref instanceof DstReference) {
+        rb.setDstSnapshotId(((DstReference) ref).getDstSnapshotId());
+      }
+      return rb;
+    }
+
+    private final FSNamesystem fsn;
+    private final FileSummary.Builder summary;
+    private final SaveNamespaceContext context;
+    private final FSImageFormatProtobuf.Saver parent;
+
+    Saver(FSImageFormatProtobuf.Saver parent, FileSummary.Builder summary) {
+      this.parent = parent;
+      this.summary = summary;
+      this.context = parent.getContext();
+      this.fsn = context.getSourceNamesystem();
+    }
+
+    void serializeINodeDirectorySection(OutputStream out) throws IOException {
+      Iterator<INodeWithAdditionalFields> iter = fsn.getFSDirectory()
+          .getINodeMap().getMapIterator();
+      int i = 0;
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields n = iter.next();
+        if (!n.isDirectory()) {
+          continue;
+        }
+
+        ReadOnlyList<INode> children = n.asDirectory().getChildrenList(
+            Snapshot.CURRENT_STATE_ID);
+        if (children.size() > 0) {
+          INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.
+              DirEntry.newBuilder().setParent(n.getId());
+          List<INodeReference> refs = new ArrayList<INodeReference>();
+          for (INode inode : children) {
+            if (!inode.isReference()) {
+              b.addChildren(inode.getId());
+            } else {
+              refs.add(inode.asReference());
+            }
+          }
+          b.setNumOfRef(refs.size());
+          INodeDirectorySection.DirEntry e = b.build();
+          e.writeDelimitedTo(out);
+          for (INodeReference ref : refs) {
+            INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
+            rb.build().writeDelimitedTo(out);
+          }
+        }
+
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
+      }
+      parent.commitSection(summary,
+          FSImageFormatProtobuf.SectionName.INODE_DIR);
+    }
+
+    void serializeINodeSection(OutputStream out) throws IOException {
+      INodeMap inodesMap = fsn.dir.getINodeMap();
+
+      INodeSection.Builder b = INodeSection.newBuilder()
+          .setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size());
+      INodeSection s = b.build();
+      s.writeDelimitedTo(out);
+
+      int i = 0;
+      Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields n = iter.next();
+        save(out, n);
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
+      }
+      parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
+    }
+
+    void serializeFilesUCSection(OutputStream out) throws IOException {
+      Map<String, INodeFile> ucMap = fsn.getFilesUnderConstruction();
+      for (Map.Entry<String, INodeFile> entry : ucMap.entrySet()) {
+        String path = entry.getKey();
+        INodeFile file = entry.getValue();
+        FileUnderConstructionEntry.Builder b = FileUnderConstructionEntry
+            .newBuilder().setInodeId(file.getId()).setFullPath(path);
+        FileUnderConstructionEntry e = b.build();
+        e.writeDelimitedTo(out);
+      }
+      parent.commitSection(summary,
+          FSImageFormatProtobuf.SectionName.FILES_UNDERCONSTRUCTION);
+    }
+
+    private void save(OutputStream out, INode n) throws IOException {
+      if (n.isDirectory()) {
+        save(out, n.asDirectory());
+      } else if (n.isFile()) {
+        save(out, n.asFile());
+      } else if (n.isSymlink()) {
+        save(out, n.asSymlink());
+      }
+    }
+
+    private void save(OutputStream out, INodeDirectory n) throws IOException {
+      INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
+          parent.getStringMap());
+      INodeSection.INode r = buildINodeCommon(n)
+          .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private void save(OutputStream out, INodeFile n) throws IOException {
+      INodeSection.INodeFile.Builder b = buildINodeFile(n,
+          parent.getStringMap());
+
+      for (Block block : n.getBlocks()) {
+        b.addBlocks(PBHelper.convert(block));
+      }
+
+      FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
+      if (uc != null) {
+        INodeSection.FileUnderConstructionFeature f =
+            INodeSection.FileUnderConstructionFeature
+            .newBuilder().setClientName(uc.getClientName())
+            .setClientMachine(uc.getClientMachine()).build();
+        b.setFileUC(f);
+      }
+
+      INodeSection.INode r = buildINodeCommon(n)
+          .setType(INodeSection.INode.Type.FILE).setFile(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private void save(OutputStream out, INodeSymlink n) throws IOException {
+      INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
+          .newBuilder()
+          .setPermission(buildPermissionStatus(n, parent.getStringMap()))
+          .setTarget(ByteString.copyFrom(n.getSymlink()));
+      INodeSection.INode r = buildINodeCommon(n)
+          .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private final INodeSection.INode.Builder buildINodeCommon(INode n) {
+      return INodeSection.INode.newBuilder()
+          .setId(n.getId())
+          .setName(ByteString.copyFrom(n.getLocalNameBytes()));
+    }
+  }
+
+  private FSImageFormatPBINode() {
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java?rev=1568548&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java Sat Feb 15 00:08:04 2014
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressorStream;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.LimitInputStream;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * Utility class to read / write fsimage in protobuf format.
+ */
+@InterfaceAudience.Private
+public final class FSImageFormatProtobuf {
+  private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
+
+  public static final class Loader implements FSImageFormat.AbstractLoader {
+    static final int MINIMUM_FILE_LENGTH = 8;
+    private final Configuration conf;
+    private final FSNamesystem fsn;
+
+    private String[] stringTable;
+
+    /** The MD5 sum of the loaded file */
+    private MD5Hash imgDigest;
+    /** The transaction ID of the last edit represented by the loaded file */
+    private long imgTxId;
+
+    Loader(Configuration conf, FSNamesystem fsn) {
+      this.conf = conf;
+      this.fsn = fsn;
+    }
+
+    @Override
+    public MD5Hash getLoadedImageMd5() {
+      return imgDigest;
+    }
+
+    @Override
+    public long getLoadedImageTxId() {
+      return imgTxId;
+    }
+
+    public String[] getStringTable() {
+      return stringTable;
+    }
+
+    void load(File file) throws IOException {
+      long start = System.currentTimeMillis();
+      imgDigest = MD5FileUtils.computeMd5ForFile(file);
+      RandomAccessFile raFile = new RandomAccessFile(file, "r");
+      FileInputStream fin = new FileInputStream(file);
+      try {
+        loadInternal(raFile, fin);
+        long end = System.currentTimeMillis();
+        LOG.info("Loaded FSImage in " + (end - start) / 1000 + " seconds.");
+      } finally {
+        fin.close();
+        raFile.close();
+      }
+    }
+
+    private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
+        throws IOException {
+      if (!FSImageUtil.checkFileFormat(raFile)) {
+        throw new IOException("Unrecognized file format");
+      }
+      FileSummary summary = FSImageUtil.loadSummary(raFile);
+
+      FileChannel channel = fin.getChannel();
+
+      FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
+          fsn, this);
+      FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
+          fsn, this);
+
+      ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
+          .getSectionsList());
+      Collections.sort(sections, new Comparator<FileSummary.Section>() {
+        @Override
+        public int compare(FileSummary.Section s1, FileSummary.Section s2) {
+          SectionName n1 = SectionName.fromString(s1.getName());
+          SectionName n2 = SectionName.fromString(s2.getName());
+          if (n1 == null) {
+            return n2 == null ? 0 : -1;
+          } else if (n2 == null) {
+            return -1;
+          } else {
+            return n1.ordinal() - n2.ordinal();
+          }
+        }
+      });
+
+      StartupProgress prog = NameNode.getStartupProgress();
+      /**
+       * beginStep() and the endStep() calls do not match the boundary of the
+       * sections. This is because that the current implementation only allows
+       * a particular step to be started for once.
+       */
+      Step currentStep = null;
+
+      for (FileSummary.Section s : sections) {
+        channel.position(s.getOffset());
+        InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+            s.getLength()));
+
+        in = FSImageUtil.wrapInputStreamForCompression(conf,
+            summary.getCodec(), in);
+
+        String n = s.getName();
+
+        switch (SectionName.fromString(n)) {
+        case NS_INFO:
+          loadNameSystemSection(in);
+          break;
+        case STRING_TABLE:
+          loadStringTableSection(in);
+          break;
+        case INODE: {
+          currentStep = new Step(StepType.INODES);
+          prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
+          inodeLoader.loadINodeSection(in);
+        }
+          break;
+        case INODE_DIR:
+          inodeLoader.loadINodeDirectorySection(in);
+          break;
+        case FILES_UNDERCONSTRUCTION:
+          inodeLoader.loadFilesUnderConstructionSection(in);
+          break;
+        case SNAPSHOT:
+          snapshotLoader.loadSnapshotSection(in);
+          break;
+        case SNAPSHOT_DIFF:
+          snapshotLoader.loadSnapshotDiffSection(in);
+          break;
+        case SECRET_MANAGER: {
+          prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
+          Step step = new Step(StepType.DELEGATION_TOKENS);
+          prog.beginStep(Phase.LOADING_FSIMAGE, step);
+          loadSecretManagerSection(in);
+          prog.endStep(Phase.LOADING_FSIMAGE, step);
+        }
+          break;
+        case CACHE_MANAGER: {
+          Step step = new Step(StepType.CACHE_POOLS);
+          prog.beginStep(Phase.LOADING_FSIMAGE, step);
+          loadCacheManagerSection(in);
+          prog.endStep(Phase.LOADING_FSIMAGE, step);
+        }
+          break;
+        default:
+          LOG.warn("Unregconized section " + n);
+          break;
+        }
+      }
+    }
+
+    private void loadNameSystemSection(InputStream in) throws IOException {
+      NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
+      fsn.setGenerationStampV1(s.getGenstampV1());
+      fsn.setGenerationStampV2(s.getGenstampV2());
+      fsn.setGenerationStampV1Limit(s.getGenstampV1Limit());
+      fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
+      imgTxId = s.getTransactionId();
+    }
+
+    private void loadStringTableSection(InputStream in) throws IOException {
+      StringTableSection s = StringTableSection.parseDelimitedFrom(in);
+      stringTable = new String[s.getNumEntry() + 1];
+      for (int i = 0; i < s.getNumEntry(); ++i) {
+        StringTableSection.Entry e = StringTableSection.Entry
+            .parseDelimitedFrom(in);
+        stringTable[e.getId()] = e.getStr();
+      }
+    }
+
+    private void loadSecretManagerSection(InputStream in) throws IOException {
+      SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in);
+      int numKeys = s.getNumKeys(), numTokens = s.getNumTokens();
+      ArrayList<SecretManagerSection.DelegationKey> keys = Lists
+          .newArrayListWithCapacity(numKeys);
+      ArrayList<SecretManagerSection.PersistToken> tokens = Lists
+          .newArrayListWithCapacity(numTokens);
+
+      for (int i = 0; i < numKeys; ++i)
+        keys.add(SecretManagerSection.DelegationKey.parseDelimitedFrom(in));
+
+      for (int i = 0; i < numTokens; ++i)
+        tokens.add(SecretManagerSection.PersistToken.parseDelimitedFrom(in));
+
+      fsn.loadSecretManagerState(s, keys, tokens);
+    }
+
+    private void loadCacheManagerSection(InputStream in) throws IOException {
+      CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in);
+      ArrayList<CachePoolInfoProto> pools = Lists.newArrayListWithCapacity(s
+          .getNumPools());
+      ArrayList<CacheDirectiveInfoProto> directives = Lists
+          .newArrayListWithCapacity(s.getNumDirectives());
+      for (int i = 0; i < s.getNumPools(); ++i)
+        pools.add(CachePoolInfoProto.parseDelimitedFrom(in));
+      for (int i = 0; i < s.getNumDirectives(); ++i)
+        directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in));
+      fsn.getCacheManager().loadState(
+          new CacheManager.PersistState(s, pools, directives));
+    }
+
+  }
+
+  public static final class Saver {
+    private final SaveNamespaceContext context;
+    private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
+    private MD5Hash savedDigest;
+    private StringMap stringMap = new StringMap();
+
+    private FileChannel fileChannel;
+    // OutputStream for the section data
+    private OutputStream sectionOutputStream;
+    private CompressionCodec codec;
+    private OutputStream underlyingOutputStream;
+    public static final int CHECK_CANCEL_INTERVAL = 4096;
+
+    Saver(SaveNamespaceContext context) {
+      this.context = context;
+    }
+
+    public MD5Hash getSavedDigest() {
+      return savedDigest;
+    }
+
+    public SaveNamespaceContext getContext() {
+      return context;
+    }
+
+    public void commitSection(FileSummary.Builder summary, SectionName name)
+        throws IOException {
+      long oldOffset = currentOffset;
+      flushSectionOutputStream();
+
+      if (codec != null) {
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
+      } else {
+        sectionOutputStream = underlyingOutputStream;
+      }
+      long length = fileChannel.position() - oldOffset;
+      summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
+          .setLength(length).setOffset(currentOffset));
+      currentOffset += length;
+    }
+
+    private void flushSectionOutputStream() throws IOException {
+      if (codec != null) {
+        ((CompressorStream) sectionOutputStream).finish();
+      }
+      sectionOutputStream.flush();
+    }
+
+    void save(File file, FSImageCompression compression) throws IOException {
+      FileOutputStream fout = new FileOutputStream(file);
+      fileChannel = fout.getChannel();
+      try {
+        saveInternal(fout, compression, file.getAbsolutePath().toString());
+      } finally {
+        fout.close();
+      }
+    }
+
+    private static void saveFileSummary(OutputStream out, FileSummary summary)
+        throws IOException {
+      summary.writeDelimitedTo(out);
+      int length = getOndiskTrunkSize(summary);
+      byte[] lengthBytes = new byte[4];
+      ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
+      out.write(lengthBytes);
+    }
+
+    private void saveInodes(FileSummary.Builder summary) throws IOException {
+      FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,
+          summary);
+
+      saver.serializeINodeSection(sectionOutputStream);
+      saver.serializeINodeDirectorySection(sectionOutputStream);
+      saver.serializeFilesUCSection(sectionOutputStream);
+    }
+
+    private void saveSnapshots(FileSummary.Builder summary) throws IOException {
+      FSImageFormatPBSnapshot.Saver snapshotSaver = new FSImageFormatPBSnapshot.Saver(
+          this, summary, context, context.getSourceNamesystem());
+
+      snapshotSaver.serializeSnapshotSection(sectionOutputStream);
+      snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream);
+    }
+
+    private void saveInternal(FileOutputStream fout,
+        FSImageCompression compression, String filePath) throws IOException {
+      StartupProgress prog = NameNode.getStartupProgress();
+      MessageDigest digester = MD5Hash.getDigester();
+
+      underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(
+          fout), digester);
+      underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
+
+      fileChannel = fout.getChannel();
+
+      FileSummary.Builder b = FileSummary.newBuilder()
+          .setOndiskVersion(FSImageUtil.FILE_VERSION)
+          .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion());
+
+      codec = compression.getImageCodec();
+      if (codec != null) {
+        b.setCodec(codec.getClass().getCanonicalName());
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
+      } else {
+        sectionOutputStream = underlyingOutputStream;
+      }
+
+      saveNameSystemSection(b);
+      // Check for cancellation right after serializing the name system section.
+      // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
+      // depends on this behavior.
+      context.checkCancelled();
+
+      Step step = new Step(StepType.INODES, filePath);
+      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+      saveInodes(b);
+      saveSnapshots(b);
+      prog.endStep(Phase.SAVING_CHECKPOINT, step);
+
+      step = new Step(StepType.DELEGATION_TOKENS, filePath);
+      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+      saveSecretManagerSection(b);
+      prog.endStep(Phase.SAVING_CHECKPOINT, step);
+
+      step = new Step(StepType.CACHE_POOLS, filePath);
+      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+      saveCacheManagerSection(b);
+      prog.endStep(Phase.SAVING_CHECKPOINT, step);
+
+      saveStringTableSection(b);
+
+      // We use the underlyingOutputStream to write the header. Therefore flush
+      // the buffered stream (which is potentially compressed) first.
+      flushSectionOutputStream();
+
+      FileSummary summary = b.build();
+      saveFileSummary(underlyingOutputStream, summary);
+      underlyingOutputStream.close();
+      savedDigest = new MD5Hash(digester.digest());
+    }
+
+    private void saveSecretManagerSection(FileSummary.Builder summary)
+        throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      DelegationTokenSecretManager.SecretManagerState state = fsn
+          .saveSecretManagerState();
+      state.section.writeDelimitedTo(sectionOutputStream);
+      for (SecretManagerSection.DelegationKey k : state.keys)
+        k.writeDelimitedTo(sectionOutputStream);
+
+      for (SecretManagerSection.PersistToken t : state.tokens)
+        t.writeDelimitedTo(sectionOutputStream);
+
+      commitSection(summary, SectionName.SECRET_MANAGER);
+    }
+
+    private void saveCacheManagerSection(FileSummary.Builder summary)
+        throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      CacheManager.PersistState state = fsn.getCacheManager().saveState();
+      state.section.writeDelimitedTo(sectionOutputStream);
+
+      for (CachePoolInfoProto p : state.pools)
+        p.writeDelimitedTo(sectionOutputStream);
+
+      for (CacheDirectiveInfoProto p : state.directives)
+        p.writeDelimitedTo(sectionOutputStream);
+
+      commitSection(summary, SectionName.CACHE_MANAGER);
+    }
+
+    private void saveNameSystemSection(FileSummary.Builder summary)
+        throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      OutputStream out = sectionOutputStream;
+      NameSystemSection.Builder b = NameSystemSection.newBuilder()
+          .setGenstampV1(fsn.getGenerationStampV1())
+          .setGenstampV1Limit(fsn.getGenerationStampV1Limit())
+          .setGenstampV2(fsn.getGenerationStampV2())
+          .setLastAllocatedBlockId(fsn.getLastAllocatedBlockId())
+          .setTransactionId(context.getTxId());
+
+      // We use the non-locked version of getNamespaceInfo here since
+      // the coordinating thread of saveNamespace already has read-locked
+      // the namespace for us. If we attempt to take another readlock
+      // from the actual saver thread, there's a potential of a
+      // fairness-related deadlock. See the comments on HDFS-2223.
+      b.setNamespaceId(fsn.unprotectedGetNamespaceInfo().getNamespaceID());
+      NameSystemSection s = b.build();
+      s.writeDelimitedTo(out);
+
+      commitSection(summary, SectionName.NS_INFO);
+    }
+
+    private void saveStringTableSection(FileSummary.Builder summary)
+        throws IOException {
+      OutputStream out = sectionOutputStream;
+      StringTableSection.Builder b = StringTableSection.newBuilder()
+          .setNumEntry(stringMap.size());
+      b.build().writeDelimitedTo(out);
+      for (Entry<String, Integer> e : stringMap.entrySet()) {
+        StringTableSection.Entry.Builder eb = StringTableSection.Entry
+            .newBuilder().setId(e.getValue()).setStr(e.getKey());
+        eb.build().writeDelimitedTo(out);
+      }
+      commitSection(summary, SectionName.STRING_TABLE);
+    }
+
+    public StringMap getStringMap() {
+      return stringMap;
+    }
+  }
+
+  public static class StringMap {
+    private final Map<String, Integer> stringMap;
+
+    public StringMap() {
+      stringMap = Maps.newHashMap();
+    }
+
+    int getStringId(String str) {
+      if (str == null) {
+        return 0;
+      }
+      Integer v = stringMap.get(str);
+      if (v == null) {
+        int nv = stringMap.size() + 1;
+        stringMap.put(str, nv);
+        return nv;
+      }
+      return v;
+    }
+
+    int size() {
+      return stringMap.size();
+    }
+
+    Set<Entry<String, Integer>> entrySet() {
+      return stringMap.entrySet();
+    }
+  }
+
+  /**
+   * Supported section name. The order of the enum determines the order of
+   * loading.
+   */
+  public enum SectionName {
+    NS_INFO("NS_INFO"),
+    STRING_TABLE("STRING_TABLE"),
+    INODE("INODE"),
+    SNAPSHOT("SNAPSHOT"),
+    INODE_DIR("INODE_DIR"),
+    FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
+    SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
+    SECRET_MANAGER("SECRET_MANAGER"),
+    CACHE_MANAGER("CACHE_MANAGER");
+
+    private static final SectionName[] values = SectionName.values();
+
+    public static SectionName fromString(String name) {
+      for (SectionName n : values) {
+        if (n.name.equals(name))
+          return n;
+      }
+      return null;
+    }
+
+    private final String name;
+
+    private SectionName(String name) {
+      this.name = name;
+    }
+  }
+
+  private static int getOndiskTrunkSize(com.google.protobuf.GeneratedMessage s) {
+    return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
+        + s.getSerializedSize();
+  }
+
+  private FSImageFormatProtobuf() {
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java?rev=1568548&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java Sat Feb 15 00:08:04 2014
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.Loader;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+@InterfaceAudience.Private
+public final class FSImageUtil {
+  public static final byte[] MAGIC_HEADER = "HDFSIMG1".getBytes();
+  public static final int FILE_VERSION = 1;
+
+  public static boolean checkFileFormat(RandomAccessFile file)
+      throws IOException {
+    if (file.length() < Loader.MINIMUM_FILE_LENGTH)
+      return false;
+
+    byte[] magic = new byte[MAGIC_HEADER.length];
+    file.readFully(magic);
+    if (!Arrays.equals(MAGIC_HEADER, magic))
+      return false;
+
+    return true;
+  }
+
+  public static FileSummary loadSummary(RandomAccessFile file)
+      throws IOException {
+    final int FILE_LENGTH_FIELD_SIZE = 4;
+    long fileLength = file.length();
+    file.seek(fileLength - FILE_LENGTH_FIELD_SIZE);
+    int summaryLength = file.readInt();
+
+    if (summaryLength <= 0) {
+      throw new IOException("Negative length of the file");
+    }
+    file.seek(fileLength - FILE_LENGTH_FIELD_SIZE - summaryLength);
+
+    byte[] summaryBytes = new byte[summaryLength];
+    file.readFully(summaryBytes);
+
+    FileSummary summary = FileSummary
+        .parseDelimitedFrom(new ByteArrayInputStream(summaryBytes));
+    if (summary.getOndiskVersion() != FILE_VERSION) {
+      throw new IOException("Unsupported file version "
+          + summary.getOndiskVersion());
+    }
+
+    if (!LayoutVersion.supports(Feature.PROTOBUF_FORMAT,
+        summary.getLayoutVersion())) {
+      throw new IOException("Unsupported layout version "
+          + summary.getLayoutVersion());
+    }
+    return summary;
+  }
+
+  public static InputStream wrapInputStreamForCompression(
+      Configuration conf, String codec, InputStream in) throws IOException {
+    if (codec.isEmpty())
+      return in;
+
+    FSImageCompression compression = FSImageCompression.createCompression(
+        conf, codec);
+    CompressionCodec imageCodec = compression.getImageCodec();
+    return imageCodec.createInputStream(in);
+  }
+
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Feb 15 00:08:04 2014
@@ -180,6 +180,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@@ -197,6 +198,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection.PersistToken;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -5993,6 +5996,15 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * @return all the under-construction files in the lease map
+   */
+  Map<String, INodeFile> getFilesUnderConstruction() {
+    synchronized (leaseManager) {
+      return leaseManager.getINodesUnderConstruction();
+    }
+  }
+
+  /**
    * Register a Backup name-node, verifying that it belongs
    * to the correct namespace, and adding it to the set of
    * active journals if necessary.
@@ -6268,6 +6280,10 @@ public class FSNamesystem implements Nam
     dtSecretManager.saveSecretManagerStateCompat(out, sdPath);
   }
 
+  SecretManagerState saveSecretManagerState() {
+    return dtSecretManager.saveSecretManagerState();
+  }
+
   /**
    * @param in load the state of secret manager from input stream
    */
@@ -6275,6 +6291,12 @@ public class FSNamesystem implements Nam
     dtSecretManager.loadSecretManagerStateCompat(in);
   }
 
+  void loadSecretManagerState(SecretManagerSection s,
+      List<SecretManagerSection.DelegationKey> keys,
+      List<SecretManagerSection.PersistToken> tokens) throws IOException {
+    dtSecretManager.loadSecretManagerState(new SecretManagerState(s, keys, tokens));
+  }
+
   /**
    * Log the updateMasterKey operation to edit logs
    * 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Sat Feb 15 00:08:04 2014
@@ -171,7 +171,7 @@ public class INodeDirectory extends INod
     return children == null? -1: Collections.binarySearch(children, name);
   }
   
-  protected DirectoryWithSnapshotFeature addSnapshotFeature(
+  public DirectoryWithSnapshotFeature addSnapshotFeature(
       DirectoryDiffList diffs) {
     Preconditions.checkState(!isWithSnapshot(), 
         "Directory is already with snapshot");

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sat Feb 15 00:08:04 2014
@@ -252,7 +252,7 @@ public class INodeFile extends INodeWith
   
   /* Start of Snapshot Feature */
 
-  private FileWithSnapshotFeature addSnapshotFeature(FileDiffList diffs) {
+  public FileWithSnapshotFeature addSnapshotFeature(FileDiffList diffs) {
     Preconditions.checkState(!isWithSnapshot(), 
         "File is already with snapshot");
     FileWithSnapshotFeature sf = new FileWithSnapshotFeature(diffs);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Sat Feb 15 00:08:04 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,6 +47,10 @@ public class INodeMap {
   /** Synchronized by external lock. */
   private final GSet<INode, INodeWithAdditionalFields> map;
   
+  public Iterator<INodeWithAdditionalFields> getMapIterator() {
+    return map.iterator();
+  }
+
   private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
     Preconditions.checkArgument(map != null);
     this.map = map;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java Sat Feb 15 00:08:04 2014
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.util.Canceler;
 
@@ -32,7 +33,8 @@ import com.google.common.base.Preconditi
  * allows cancellation, and also is responsible for accumulating
  * failed storage directories.
  */
-class SaveNamespaceContext {
+@InterfaceAudience.Private
+public class SaveNamespaceContext {
   private final FSNamesystem sourceNamesystem;
   private final long txid;
   private final List<StorageDirectory> errorSDs =
@@ -72,7 +74,7 @@ class SaveNamespaceContext {
     completionLatch.countDown();
   }
 
-  void checkCancelled() throws SaveNamespaceCancelledException {
+  public void checkCancelled() throws SaveNamespaceCancelledException {
     if (canceller.isCancelled()) {
       throw new SaveNamespaceCancelledException(
           canceller.getCancellationReason());

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java?rev=1568548&r1=1568547&r2=1568548&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java Sat Feb 15 00:08:04 2014
@@ -244,7 +244,7 @@ public class DirectoryWithSnapshotFeatur
       this.isSnapshotRoot = isSnapshotRoot;
     }
 
-    ChildrenDiff getChildrenDiff() {
+    public ChildrenDiff getChildrenDiff() {
       return diff;
     }
     
@@ -343,6 +343,10 @@ public class DirectoryWithSnapshotFeatur
       return super.toString() + " childrenSize=" + childrenSize + ", " + diff;
     }
 
+    int getChildrenSize() {
+      return childrenSize;
+    }
+
     @Override
     void write(DataOutput out, ReferenceMap referenceMap) throws IOException {
       writeSnapshot(out);



Mime
View raw message