hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1152295 [5/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/j...
Date Fri, 29 Jul 2011 16:28:51 GMT
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jul 29 16:28:45 2011
@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
@@ -382,7 +383,7 @@ public class NameNode implements Namenod
     nodeRegistration = new NamenodeRegistration(
         getHostPortString(rpcAddress),
         getHostPortString(getHttpAddress()),
-        getFSImage().getStorage(), getRole(), getFSImage().getStorage().getCheckpointTime());
+        getFSImage().getStorage(), getRole());
     return nodeRegistration;
   }
 
@@ -647,8 +648,9 @@ public class NameNode implements Namenod
   public NamenodeRegistration register(NamenodeRegistration registration)
   throws IOException {
     verifyVersion(registration.getVersion());
-    namesystem.registerBackupNode(registration);
-    return setRegistration();
+    NamenodeRegistration myRegistration = setRegistration();
+    namesystem.registerBackupNode(registration, myRegistration);
+    return myRegistration;
   }
 
   @Override // NamenodeProtocol
@@ -669,22 +671,6 @@ public class NameNode implements Namenod
     namesystem.endCheckpoint(registration, sig);
   }
 
-  @Override // NamenodeProtocol
-  public long journalSize(NamenodeRegistration registration)
-  throws IOException {
-    verifyRequest(registration);
-    return namesystem.getEditLogSize();
-  }
-
-  @Override // NamenodeProtocol
-  public void journal(NamenodeRegistration registration,
-                      int jAction,
-                      int length,
-                      byte[] args) throws IOException {
-    // Active name-node cannot journal.
-    throw new UnsupportedActionException("journal");
-  }
-
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
@@ -1056,21 +1042,20 @@ public class NameNode implements Namenod
     namesystem.refreshNodes(new HdfsConfiguration());
   }
 
-  @Deprecated // NamenodeProtocol
-  public long getEditLogSize() throws IOException {
-    return namesystem.getEditLogSize();
+  @Override // NamenodeProtocol
+  public long getTransactionID() {
+    return namesystem.getTransactionID();
   }
 
-  @Deprecated
   @Override // NamenodeProtocol
   public CheckpointSignature rollEditLog() throws IOException {
     return namesystem.rollEditLog();
   }
-
-  @Deprecated
-  @Override // NamenodeProtocol
-  public void rollFsImage(CheckpointSignature sig) throws IOException {
-    namesystem.rollFSImage(sig);
+  
+  @Override
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+  throws IOException {
+    return namesystem.getEditLogManifest(sinceTxId);
   }
     
   @Override // ClientProtocol
@@ -1279,27 +1264,12 @@ public class NameNode implements Namenod
     if (version != LAYOUT_VERSION)
       throw new IncorrectVersionException(version, "data node");
   }
-
-  /**
-   * Returns the name of the fsImage file
-   */
-  public File getFsImageName() throws IOException {
-    return getFSImage().getStorage().getFsImageName();
-  }
     
   public FSImage getFSImage() {
     return namesystem.dir.fsImage;
   }
 
   /**
-   * Returns the name of the fsImage file uploaded by periodic
-   * checkpointing
-   */
-  public File[] getFsImageNameCheckpoint() throws IOException {
-    return getFSImage().getStorage().getFsImageNameCheckpoint();
-  }
-
-  /**
    * Returns the address on which the NameNodes is listening to.
    * @return namenode rpc address
    */
@@ -1374,20 +1344,16 @@ public class NameNode implements Namenod
     }
     System.out.println("Formatting using clusterid: " + clusterId);
     
-    FSImage fsImage = new FSImage(dirsToFormat, editDirsToFormat);
+    FSImage fsImage = new FSImage(conf, null, dirsToFormat, editDirsToFormat);
     FSNamesystem nsys = new FSNamesystem(fsImage, conf);
-    nsys.dir.fsImage.getStorage().format(clusterId);
+    nsys.dir.fsImage.format(clusterId);
     return false;
   }
 
   private static boolean finalize(Configuration conf,
                                boolean isConfirmationNeeded
                                ) throws IOException {
-    Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
-    Collection<URI> editDirsToFormat = 
-                               FSNamesystem.getNamespaceEditsDirs(conf);
-    FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
-                                         editDirsToFormat), conf);
+    FSNamesystem nsys = new FSNamesystem(new FSImage(conf), conf);
     System.err.print(
         "\"finalize\" will remove the previous state of the files system.\n"
         + "Recent upgrade will become permanent.\n"

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Jul 29 16:28:45 2011
@@ -23,11 +23,19 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -43,10 +51,11 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -60,6 +69,9 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
 /**********************************************************
  * The Secondary NameNode is a helper to the primary NameNode.
  * The Secondary is responsible for supporting periodic checkpoints 
@@ -98,11 +110,18 @@ public class SecondaryNameNode implement
   private int imagePort;
   private String infoBindAddress;
 
-  private FSNamesystem namesystem;
   private Collection<URI> checkpointDirs;
   private Collection<URI> checkpointEditsDirs;
+  
+  /** How often to checkpoint regardless of number of txns */
   private long checkpointPeriod;    // in seconds
-  private long checkpointSize;    // size (in bytes) of current Edit Log
+  
+  /** How often to poll the NN to check checkpointTxnCount */
+  private long checkpointCheckPeriod; // in seconds
+  
+  /** checkpoint once every this many transactions, regardless of time */
+  private long checkpointTxnCount;
+
 
   /** {@inheritDoc} */
   public String toString() {
@@ -111,23 +130,49 @@ public class SecondaryNameNode implement
       + "\nStart Time           : " + new Date(starttime)
       + "\nLast Checkpoint Time : " + (lastCheckpointTime == 0? "--": new Date(lastCheckpointTime))
       + "\nCheckpoint Period    : " + checkpointPeriod + " seconds"
-      + "\nCheckpoint Size      : " + StringUtils.byteDesc(checkpointSize)
-                                    + " (= " + checkpointSize + " bytes)" 
+      + "\nCheckpoint Size      : " + StringUtils.byteDesc(checkpointTxnCount)
+                                    + " (= " + checkpointTxnCount + " bytes)" 
       + "\nCheckpoint Dirs      : " + checkpointDirs
       + "\nCheckpoint Edits Dirs: " + checkpointEditsDirs;
   }
 
+  @VisibleForTesting
   FSImage getFSImage() {
     return checkpointImage;
   }
+  
+  @VisibleForTesting
+  void setFSImage(CheckpointStorage image) {
+    this.checkpointImage = image;
+  }
+  
+  @VisibleForTesting
+  NamenodeProtocol getNameNode() {
+    return namenode;
+  }
+  
+  @VisibleForTesting
+  void setNameNode(NamenodeProtocol namenode) {
+    this.namenode = namenode;
+  }
 
+  @VisibleForTesting
+  List<URI> getCheckpointDirs() {
+    return ImmutableList.copyOf(checkpointDirs);
+  }
+  
   /**
    * Create a connection to the primary namenode.
    */
   public SecondaryNameNode(Configuration conf)  throws IOException {
+    this(conf, new CommandLineOpts());
+  }
+  
+  public SecondaryNameNode(Configuration conf,
+      CommandLineOpts commandLineOpts) throws IOException {
     try {
       NameNode.initializeGenericKeys(conf);
-      initialize(conf);
+      initialize(conf, commandLineOpts);
     } catch(IOException e) {
       shutdown();
       LOG.fatal("Failed to start secondary namenode. ", e);
@@ -143,8 +188,10 @@ public class SecondaryNameNode implement
   
   /**
    * Initialize SecondaryNameNode.
+   * @param commandLineOpts 
    */
-  private void initialize(final Configuration conf) throws IOException {
+  private void initialize(final Configuration conf,
+      CommandLineOpts commandLineOpts) throws IOException {
     final InetSocketAddress infoSocAddr = getHttpAddress(conf);
     infoBindAddress = infoSocAddr.getHostName();
     UserGroupInformation.setConfiguration(conf);
@@ -171,14 +218,19 @@ public class SecondaryNameNode implement
                                   "/tmp/hadoop/dfs/namesecondary");
     checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, 
                                   "/tmp/hadoop/dfs/namesecondary");    
-    checkpointImage = new CheckpointStorage(conf);
-    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
+    checkpointImage = new CheckpointStorage(conf, checkpointDirs, checkpointEditsDirs);
+    checkpointImage.recoverCreate(commandLineOpts.shouldFormat());
 
     // Initialize other scheduling parameters from the configuration
+    checkpointCheckPeriod = conf.getLong(
+        DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
+        DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT);
+        
     checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 
                                     DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
-    checkpointSize = conf.getLong(DFS_NAMENODE_CHECKPOINT_SIZE_KEY, 
-                                  DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT);
+    checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 
+                                  DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
+    warnForDeprecatedConfigs(conf);
 
     // initialize the webserver for uploading files.
     // Kerberized SSL servers must be run from the host principal...
@@ -204,8 +256,8 @@ public class SecondaryNameNode implement
             System.setProperty("https.cipherSuites", 
                 Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
             InetSocketAddress secInfoSocAddr = 
-              NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.get(
-                "dfs.secondary.https.port", infoBindAddress + ":" + 0));
+              NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.getInt(
+                "dfs.secondary.https.port", 443));
             imagePort = secInfoSocAddr.getPort();
             infoServer.addSslListener(secInfoSocAddr, conf, false, true);
           }
@@ -227,15 +279,28 @@ public class SecondaryNameNode implement
 
     // The web-server port can be ephemeral... ensure we have the correct info
     infoPort = infoServer.getPort();
-    if(!UserGroupInformation.isSecurityEnabled())
+    if (!UserGroupInformation.isSecurityEnabled()) {
       imagePort = infoPort;
+    }
+    
     conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" +infoPort); 
     LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
     LOG.info("Secondary image servlet up at: " + infoBindAddress + ":" + imagePort);
-    LOG.warn("Checkpoint Period   :" + checkpointPeriod + " secs " +
+    LOG.info("Checkpoint Period   :" + checkpointPeriod + " secs " +
              "(" + checkpointPeriod/60 + " min)");
-    LOG.warn("Log Size Trigger    :" + checkpointSize + " bytes " +
-             "(" + checkpointSize/1024 + " KB)");
+    LOG.info("Log Size Trigger    :" + checkpointTxnCount + " txns");
+  }
+
+  static void warnForDeprecatedConfigs(Configuration conf) {
+    for (String key : ImmutableList.of(
+          "fs.checkpoint.size",
+          "dfs.namenode.checkpoint.size")) {
+      if (conf.get(key) != null) {
+        LOG.warn("Configuration key " + key + " is deprecated! Ignoring..." +
+            " Instead please specify a value for " +
+            DFS_NAMENODE_CHECKPOINT_TXNS_KEY);
+      }
+    }
   }
 
   /**
@@ -283,13 +348,10 @@ public class SecondaryNameNode implement
   public void doWork() {
 
     //
-    // Poll the Namenode (once every 5 minutes) to find the size of the
-    // pending edit log.
+    // Poll the Namenode (once every checkpointCheckPeriod seconds) to find the
+    // number of transactions in the edit log that haven't yet been checkpointed.
     //
-    long period = 5 * 60;              // 5 minutes
-    if (checkpointPeriod < period) {
-      period = checkpointPeriod;
-    }
+    long period = Math.min(checkpointCheckPeriod, checkpointPeriod);
 
     while (shouldRun) {
       try {
@@ -307,8 +369,7 @@ public class SecondaryNameNode implement
         
         long now = System.currentTimeMillis();
 
-        long size = namenode.getEditLogSize();
-        if (size >= checkpointSize || 
+        if (shouldCheckpointBasedOnCount() ||
             now >= lastCheckpointTime + 1000 * checkpointPeriod) {
           doCheckpoint();
           lastCheckpointTime = now;
@@ -316,7 +377,6 @@ public class SecondaryNameNode implement
       } catch (IOException e) {
         LOG.error("Exception in doCheckpoint", e);
         e.printStackTrace();
-        checkpointImage.getStorage().imageDigest = null;
       } catch (Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint", e);
         e.printStackTrace();
@@ -331,49 +391,53 @@ public class SecondaryNameNode implement
    * @return true if a new image has been downloaded and needs to be loaded
    * @throws IOException
    */
-  private boolean downloadCheckpointFiles(final CheckpointSignature sig
-                                      ) throws IOException {
+  static boolean downloadCheckpointFiles(
+      final String nnHostPort,
+      final FSImage dstImage,
+      final CheckpointSignature sig,
+      final RemoteEditLogManifest manifest
+  ) throws IOException {
+    
+    // Sanity check manifest - these could happen if, eg, someone on the
+    // NN side accidentally rmed the storage directories
+    if (manifest.getLogs().isEmpty()) {
+      throw new IOException("Found no edit logs to download on NN since txid " 
+          + sig.mostRecentCheckpointTxId);
+    }
+    
+    long expectedTxId = sig.mostRecentCheckpointTxId + 1;
+    if (manifest.getLogs().get(0).getStartTxId() != expectedTxId) {
+      throw new IOException("Bad edit log manifest (expected txid = " +
+          expectedTxId + ": " + manifest);
+    }
+
     try {
         Boolean b = UserGroupInformation.getCurrentUser().doAs(
             new PrivilegedExceptionAction<Boolean>() {
   
           @Override
           public Boolean run() throws Exception {
-            checkpointImage.getStorage().cTime = sig.cTime;
-            checkpointImage.getStorage().setCheckpointTime(sig.checkpointTime);
+            dstImage.getStorage().cTime = sig.cTime;
 
             // get fsimage
-            String fileid;
-            Collection<File> list;
-            File[] srcNames;
             boolean downloadImage = true;
-            if (sig.imageDigest.equals(
-                    checkpointImage.getStorage().imageDigest)) {
+            if (sig.mostRecentCheckpointTxId ==
+                dstImage.getStorage().getMostRecentCheckpointTxId()) {
               downloadImage = false;
               LOG.info("Image has not changed. Will not download image.");
             } else {
-              fileid = "getimage=1";
-              list = checkpointImage.getStorage().getFiles(
-                  NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
-              srcNames = list.toArray(new File[list.size()]);
-              assert srcNames.length > 0 : "No checkpoint targets.";
-              TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
-              checkpointImage.getStorage().imageDigest = sig.imageDigest;
-              LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
-                  srcNames[0].length() + " bytes.");
+              MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
+                  nnHostPort, sig.mostRecentCheckpointTxId, dstImage.getStorage(), true);
+              dstImage.saveDigestAndRenameCheckpointImage(
+                  sig.mostRecentCheckpointTxId, downloadedHash);
             }
         
             // get edits file
-            fileid = "getedit=1";
-            list = getFSImage().getStorage().getFiles(
-                NameNodeFile.EDITS, NameNodeDirType.EDITS);
-            srcNames = list.toArray(new File[list.size()]);;
-            assert srcNames.length > 0 : "No checkpoint targets.";
-            TransferFsImage.getFileClient(fsName, fileid, srcNames, false);
-            LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
-                srcNames[0].length() + " bytes.");
+            for (RemoteEditLog log : manifest.getLogs()) {
+              TransferFsImage.downloadEditsToStorage(
+                  nnHostPort, log, dstImage.getStorage());
+            }
         
-            checkpointImage.checkpointUploadDone();
             return Boolean.valueOf(downloadImage);
           }
         });
@@ -388,18 +452,6 @@ public class SecondaryNameNode implement
   }
 
   /**
-   * Copy the new fsimage into the NameNode
-   */
-  private void putFSImage(CheckpointSignature sig) throws IOException {
-    String fileid = "putimage=1&port=" + imagePort +
-      "&machine=" + infoBindAddress + 
-      "&token=" + sig.toString() +
-      "&newChecksum=" + checkpointImage.getStorage().getImageDigest();
-    LOG.info("Posted URL " + fsName + fileid);
-    TransferFsImage.getFileClient(fsName, fileid, (File[])null, false);
-  }
-
-  /**
    * Returns the Jetty server that the Namenode is listening on.
    */
   private String getInfoServer() throws IOException {
@@ -423,19 +475,39 @@ public class SecondaryNameNode implement
       return configuredAddress;
     }
   }
+  
+  /**
+   * Return the host:port of where this SecondaryNameNode is listening
+   * for image transfers
+   */
+  private InetSocketAddress getImageListenAddress() {
+    return new InetSocketAddress(infoBindAddress, imagePort);
+  }
 
   /**
    * Create a new checkpoint
    * @return if the image is fetched from primary or not
    */
   boolean doCheckpoint() throws IOException {
-
-    // Do the required initialization of the merge work area.
-    startCheckpoint();
-
+    checkpointImage.ensureCurrentDirExists();
+    NNStorage dstStorage = checkpointImage.getStorage();
+    
     // Tell the namenode to start logging transactions in a new edit file
     // Returns a token that would be used to upload the merged image.
     CheckpointSignature sig = namenode.rollEditLog();
+    
+    // Make sure we're talking to the same NN!
+    if (checkpointImage.getNamespaceID() != 0) {
+      // If the image actually has some data, make sure we're talking
+      // to the same NN as we did before.
+      sig.validateStorageInfo(checkpointImage);
+    } else {
+      // if we're a fresh 2NN, just take the storage info from the server
+      // we first talk to.
+      dstStorage.setStorageInfo(sig);
+      dstStorage.setClusterID(sig.getClusterID());
+      dstStorage.setBlockPoolID(sig.getBlockpoolID());
+    }
 
     // error simulation code for junit test
     if (ErrorSimulator.getErrorSimulation(0)) {
@@ -443,14 +515,20 @@ public class SecondaryNameNode implement
                             "after creating edits.new");
     }
 
-    boolean loadImage = downloadCheckpointFiles(sig);   // Fetch fsimage and edits
-    doMerge(sig, loadImage);                   // Do the merge
-  
+    RemoteEditLogManifest manifest =
+      namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
+
+    boolean loadImage = downloadCheckpointFiles(
+        fsName, checkpointImage, sig, manifest);   // Fetch fsimage and edits
+    doMerge(sig, manifest, loadImage, checkpointImage);
+    
     //
     // Upload the new image into the NameNode. Then tell the Namenode
     // to make this new uploaded image as the most current image.
     //
-    putFSImage(sig);
+    long txid = checkpointImage.getLastAppliedTxId();
+    TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
+        dstStorage, txid);
 
     // error simulation code for junit test
     if (ErrorSimulator.getErrorSimulation(1)) {
@@ -458,91 +536,53 @@ public class SecondaryNameNode implement
                             "after uploading new image to NameNode");
     }
 
-    namenode.rollFsImage(sig);
-    checkpointImage.endCheckpoint();
-
     LOG.warn("Checkpoint done. New Image Size: " 
-             + checkpointImage.getStorage().getFsImageName().length());
+             + dstStorage.getFsImageName(txid).length());
+    
+    // Since we've successfully checkpointed, we can remove some old
+    // image files
+    checkpointImage.purgeOldStorage();
     
     return loadImage;
   }
-
-  private void startCheckpoint() throws IOException {
-    checkpointImage.getStorage().unlockAll();
-    checkpointImage.getEditLog().close();
-    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
-    checkpointImage.startCheckpoint();
-  }
-
-  /**
-   * Merge downloaded image and edits and write the new image into
-   * current storage directory.
-   */
-  private void doMerge(CheckpointSignature sig, boolean loadImage)
-  throws IOException {
-    if (loadImage) {
-      namesystem = new FSNamesystem(checkpointImage, conf);
-    }
-    assert namesystem.dir.fsImage == checkpointImage;
-    checkpointImage.doMerge(sig, loadImage);
-  }
-
+  
+  
   /**
    * @param argv The parameters passed to this program.
    * @exception Exception if the filesystem does not exist.
    * @return 0 on success, non zero on error.
    */
-  private int processArgs(String[] argv) throws Exception {
-
-    if (argv.length < 1) {
-      printUsage("");
-      return -1;
-    }
-
-    int exitCode = -1;
-    int i = 0;
-    String cmd = argv[i++];
-
-    //
-    // verify that we have enough command line parameters
-    //
-    if ("-geteditsize".equals(cmd)) {
-      if (argv.length != 1) {
-        printUsage(cmd);
-        return exitCode;
-      }
-    } else if ("-checkpoint".equals(cmd)) {
-      if (argv.length != 1 && argv.length != 2) {
-        printUsage(cmd);
-        return exitCode;
-      }
-      if (argv.length == 2 && !"force".equals(argv[i])) {
-        printUsage(cmd);
-        return exitCode;
-      }
+  private int processStartupCommand(CommandLineOpts opts) throws Exception {
+    if (opts.getCommand() == null) {
+      return 0;
     }
-
-    exitCode = 0;
+    
+    String cmd = opts.getCommand().toString().toLowerCase();
+    
+    int exitCode = 0;
     try {
-      if ("-checkpoint".equals(cmd)) {
-        long size = namenode.getEditLogSize();
-        if (size >= checkpointSize || 
-            argv.length == 2 && "force".equals(argv[i])) {
+      switch (opts.getCommand()) {
+      case CHECKPOINT:
+        long count = countUncheckpointedTxns();
+        if (count > checkpointTxnCount ||
+            opts.shouldForceCheckpoint()) {
           doCheckpoint();
         } else {
-          System.err.println("EditLog size " + size + " bytes is " +
+          System.err.println("EditLog size " + count + " transactions is " +
                              "smaller than configured checkpoint " +
-                             "size " + checkpointSize + " bytes.");
+                             "interval " + checkpointTxnCount + " transactions.");
           System.err.println("Skipping checkpoint.");
         }
-      } else if ("-geteditsize".equals(cmd)) {
-        long size = namenode.getEditLogSize();
-        System.out.println("EditLog size is " + size + " bytes");
-      } else {
-        exitCode = -1;
-        LOG.error(cmd.substring(1) + ": Unknown command");
-        printUsage("");
+        break;
+      case GETEDITSIZE:
+        long uncheckpointed = countUncheckpointedTxns();
+        System.out.println("NameNode has " + uncheckpointed +
+            " uncheckpointed transactions");
+        break;
+      default:
+        throw new AssertionError("bad command enum: " + opts.getCommand());
       }
+      
     } catch (RemoteException e) {
       //
       // This is a error returned by hadoop server. Print
@@ -551,41 +591,32 @@ public class SecondaryNameNode implement
       try {
         String[] content;
         content = e.getLocalizedMessage().split("\n");
-        LOG.error(cmd.substring(1) + ": "
-                  + content[0]);
+        LOG.error(cmd + ": " + content[0]);
       } catch (Exception ex) {
-        LOG.error(cmd.substring(1) + ": "
-                  + ex.getLocalizedMessage());
+        LOG.error(cmd + ": " + ex.getLocalizedMessage());
       }
     } catch (IOException e) {
       //
       // IO exception encountered locally.
       //
       exitCode = -1;
-      LOG.error(cmd.substring(1) + ": "
-                + e.getLocalizedMessage());
+      LOG.error(cmd + ": " + e.getLocalizedMessage());
     } finally {
       // Does the RPC connection need to be closed?
     }
     return exitCode;
   }
 
-  /**
-   * Displays format of commands.
-   * @param cmd The command that is being executed.
-   */
-  private void printUsage(String cmd) {
-    if ("-geteditsize".equals(cmd)) {
-      System.err.println("Usage: java SecondaryNameNode"
-                         + " [-geteditsize]");
-    } else if ("-checkpoint".equals(cmd)) {
-      System.err.println("Usage: java SecondaryNameNode"
-                         + " [-checkpoint [force]]");
-    } else {
-      System.err.println("Usage: java SecondaryNameNode " +
-                         "[-checkpoint [force]] " +
-                         "[-geteditsize] ");
-    }
+  private long countUncheckpointedTxns() throws IOException {
+    long curTxId = namenode.getTransactionID();
+    long uncheckpointedTxns = curTxId -
+      checkpointImage.getStorage().getMostRecentCheckpointTxId();
+    assert uncheckpointedTxns >= 0;
+    return uncheckpointedTxns;
+  }
+
+  boolean shouldCheckpointBasedOnCount() throws IOException {
+    return countUncheckpointedTxns() >= checkpointTxnCount;
   }
 
   /**
@@ -594,41 +625,151 @@ public class SecondaryNameNode implement
    * @exception Exception if the filesystem does not exist.
    */
   public static void main(String[] argv) throws Exception {
+    CommandLineOpts opts = SecondaryNameNode.parseArgs(argv);
+    if (opts == null) {
+      System.exit(-1);
+    }
+    
     StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
     Configuration tconf = new HdfsConfiguration();
-    if (argv.length >= 1) {
-      SecondaryNameNode secondary = new SecondaryNameNode(tconf);
-      int ret = secondary.processArgs(argv);
+    SecondaryNameNode secondary = new SecondaryNameNode(tconf, opts);
+
+    if (opts.getCommand() != null) {
+      int ret = secondary.processStartupCommand(opts);
       System.exit(ret);
     }
 
     // Create a never ending deamon
-    Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); 
+    Daemon checkpointThread = new Daemon(secondary);
     checkpointThread.start();
   }
+  
+  
+  /**
+   * Container for parsed command-line options.
+   */
+  @SuppressWarnings("static-access")
+  static class CommandLineOpts {
+    private final Options options = new Options();
+    
+    private final Option geteditsizeOpt;
+    private final Option checkpointOpt;
+    private final Option formatOpt;
+
 
+    Command cmd;
+    enum Command {
+      GETEDITSIZE,
+      CHECKPOINT;
+    }
+    
+    private boolean shouldForce;
+    private boolean shouldFormat;
+
+    CommandLineOpts() {
+      geteditsizeOpt = new Option("geteditsize",
+        "return the number of uncheckpointed transactions on the NameNode");
+      checkpointOpt = OptionBuilder.withArgName("force")
+        .hasOptionalArg().withDescription("checkpoint on startup").create("checkpoint");;
+      formatOpt = new Option("format", "format the local storage during startup");
+      
+      options.addOption(geteditsizeOpt);
+      options.addOption(checkpointOpt);
+      options.addOption(formatOpt);
+    }
+    
+    public boolean shouldFormat() {
+      return shouldFormat;
+    }
+
+    public void parse(String ... argv) throws ParseException {
+      CommandLineParser parser = new PosixParser();
+      CommandLine cmdLine = parser.parse(options, argv);
+      
+      boolean hasGetEdit = cmdLine.hasOption(geteditsizeOpt.getOpt());
+      boolean hasCheckpoint = cmdLine.hasOption(checkpointOpt.getOpt()); 
+      if (hasGetEdit && hasCheckpoint) {
+        throw new ParseException("May not pass both "
+            + geteditsizeOpt.getOpt() + " and "
+            + checkpointOpt.getOpt());
+      }
+      
+      if (hasGetEdit) {
+        cmd = Command.GETEDITSIZE;
+      } else if (hasCheckpoint) {
+        cmd = Command.CHECKPOINT;
+        
+        String arg = cmdLine.getOptionValue(checkpointOpt.getOpt());
+        if ("force".equals(arg)) {
+          shouldForce = true;
+        } else if (arg != null) {
+          throw new ParseException("-checkpoint may only take 'force' as an "
+              + "argument");
+        }
+      }
+      
+      if (cmdLine.hasOption(formatOpt.getOpt())) {
+        shouldFormat = true;
+      }
+    }
+    
+    public Command getCommand() {
+      return cmd;
+    }
+    
+    public boolean shouldForceCheckpoint() {
+      return shouldForce;
+    }
+    
+    void usage() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("secondarynamenode", options);
+    }
+  }
+
+  private static CommandLineOpts parseArgs(String[] argv) {
+    CommandLineOpts opts = new CommandLineOpts();
+    try {
+      opts.parse(argv);
+    } catch (ParseException pe) {
+      LOG.error(pe.getMessage());
+      opts.usage();
+      return null;
+    }
+    return opts;
+  }
+  
   static class CheckpointStorage extends FSImage {
     /**
+     * Construct a checkpoint image.
+     * @param conf Node configuration.
+     * @param imageDirs URIs of storage for image.
+     * @param editDirs URIs of storage for edit logs.
+     * @throws IOException If storage cannot be access.
      */
-    CheckpointStorage(Configuration conf) throws IOException {
-      super(conf);
+    CheckpointStorage(Configuration conf, 
+                      Collection<URI> imageDirs,
+                      Collection<URI> editsDirs) throws IOException {
+      super(conf, (FSNamesystem)null, imageDirs, editsDirs);
+      setFSNamesystem(new FSNamesystem(this, conf));
+      
+      // the 2NN never writes edits -- it only downloads them. So
+      // we shouldn't have any editLog instance. Setting to null
+      // makes sure we don't accidentally depend on it.
+      editLog = null;
     }
 
     /**
      * Analyze checkpoint directories.
      * Create directories if they do not exist.
-     * Recover from an unsuccessful checkpoint is necessary. 
-     * 
-     * @param dataDirs
-     * @param editsDirs
+     * Recover from an unsuccessful checkpoint is necessary.
+     *
      * @throws IOException
      */
-    void recoverCreate(Collection<URI> dataDirs,
-                       Collection<URI> editsDirs) throws IOException {
-      Collection<URI> tempDataDirs = new ArrayList<URI>(dataDirs);
-      Collection<URI> tempEditsDirs = new ArrayList<URI>(editsDirs);
-      storage.close();
-      storage.setStorageDirectories(tempDataDirs, tempEditsDirs);
+    void recoverCreate(boolean format) throws IOException {
+      storage.attemptRestoreRemovedStorage();
+      storage.unlockAll();
+
       for (Iterator<StorageDirectory> it = 
                    storage.dirIterator(); it.hasNext();) {
         StorageDirectory sd = it.next();
@@ -643,6 +784,13 @@ public class SecondaryNameNode implement
         if(!isAccessible)
           throw new InconsistentFSStateException(sd.getRoot(),
               "cannot access checkpoint directory.");
+        
+        if (format) {
+          // Don't confirm, since this is just the secondary namenode.
+          LOG.info("Formatting storage directory " + sd);
+          sd.clearDirectory();
+        }
+        
         StorageState curState;
         try {
           curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR, storage);
@@ -655,6 +803,11 @@ public class SecondaryNameNode implement
           case NOT_FORMATTED:
             break;  // it's ok since initially there is no current and VERSION
           case NORMAL:
+            // Read the VERSION file. This verifies that:
+            // (a) the VERSION file for each of the directories is the same,
+            // and (b) when we connect to a NN, we can verify that the remote
+            // node matches the same namespace that we ran on previously.
+            storage.readProperties(sd);
             break;
           default:  // recovery is possible
             sd.doRecover(curState);
@@ -665,63 +818,41 @@ public class SecondaryNameNode implement
         }
       }
     }
-
+    
     /**
-     * Prepare directories for a new checkpoint.
-     * <p>
-     * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
-     * and recreate <code>current</code>.
-     * @throws IOException
+     * Ensure that the current/ directory exists in all storage
+     * directories
      */
-    void startCheckpoint() throws IOException {
+    void ensureCurrentDirExists() throws IOException {
       for (Iterator<StorageDirectory> it
              = storage.dirIterator(); it.hasNext();) {
         StorageDirectory sd = it.next();
-        storage.moveCurrent(sd);
-      }
-    }
-
-    void endCheckpoint() throws IOException {
-      for (Iterator<StorageDirectory> it
-             = storage.dirIterator(); it.hasNext();) {
-        StorageDirectory sd = it.next();
-        storage.moveLastCheckpoint(sd);
+        File curDir = sd.getCurrentDir();
+        if (!curDir.exists() && !curDir.mkdirs()) {
+          throw new IOException("Could not create directory " + curDir);
+        }
       }
     }
-
-    /**
-     * Merge image and edits, and verify consistency with the signature.
-     */
-    private void doMerge(CheckpointSignature sig, boolean loadImage)
-    throws IOException {
-      getEditLog().open();
-      StorageDirectory sdName = null;
-      StorageDirectory sdEdits = null;
-      Iterator<StorageDirectory> it = null;
-      if (loadImage) {
-        it = getStorage().dirIterator(NameNodeDirType.IMAGE);
-        if (it.hasNext())
-          sdName = it.next();
-        if (sdName == null) {
-          throw new IOException("Could not locate checkpoint fsimage");
-        }
+  }
+    
+  static void doMerge(
+      CheckpointSignature sig, RemoteEditLogManifest manifest,
+      boolean loadImage, FSImage dstImage) throws IOException {   
+    NNStorage dstStorage = dstImage.getStorage();
+    
+    dstStorage.setStorageInfo(sig);
+    if (loadImage) {
+      File file = dstStorage.findImageFile(sig.mostRecentCheckpointTxId);
+      if (file == null) {
+        throw new IOException("Couldn't find image file at txid " + 
+            sig.mostRecentCheckpointTxId + " even though it should have " +
+            "just been downloaded");
       }
-      it = getStorage().dirIterator(NameNodeDirType.EDITS);
-      if (it.hasNext())
-        sdEdits = it.next();
-      if (sdEdits == null)
-        throw new IOException("Could not locate checkpoint edits");
-      if (loadImage) {
-        // to avoid assert in loadFSImage()
-        this.getStorage().layoutVersion = -1;
-        getStorage();
-        loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
-      }
-      loadFSEdits(sdEdits);
-      storage.setClusterID(sig.getClusterID());
-      storage.setBlockPoolID(sig.getBlockpoolID());
-      sig.validateStorageInfo(this);
-      saveNamespace(false);
+      dstImage.reloadFromImageFile(file);
     }
+    
+    Checkpointer.rollForwardByApplyingLogs(manifest, dstImage);
+    dstImage.saveFSImageInAllDirs(dstImage.getLastAppliedTxId());
+    dstStorage.writeAll();
   }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Fri Jul 29 16:28:45 2011
@@ -21,19 +21,22 @@ import java.io.*;
 import java.net.*;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.List;
 import java.lang.Math;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpServletRequest;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.DFSUtil.ErrorSimulator;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.collect.Lists;
+
 
 /**
  * This class provides fetching a specified file from the NameNode.
@@ -41,88 +44,77 @@ import org.apache.hadoop.security.UserGr
 class TransferFsImage implements FSConstants {
   
   public final static String CONTENT_LENGTH = "Content-Length";
-  
-  private boolean isGetImage;
-  private boolean isGetEdit;
-  private boolean isPutImage;
-  private int remoteport;
-  private String machineName;
-  private CheckpointSignature token;
-  private MD5Hash newChecksum = null;
-  
-  /**
-   * File downloader.
-   * @param pmap key=value[] map that is passed to the http servlet as 
-   *        url parameters
-   * @param request the object from which this servelet reads the url contents
-   * @param response the object into which this servelet writes the url contents
-   * @throws IOException
-   */
-  public TransferFsImage(Map<String,String[]> pmap,
-                         HttpServletRequest request,
-                         HttpServletResponse response
-                         ) throws IOException {
-    isGetImage = isGetEdit = isPutImage = false;
-    remoteport = 0;
-    machineName = null;
-    token = null;
+  public final static String MD5_HEADER = "X-MD5-Digest";
 
-    for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
-      String key = it.next();
-      if (key.equals("getimage")) { 
-        isGetImage = true;
-      } else if (key.equals("getedit")) { 
-        isGetEdit = true;
-      } else if (key.equals("putimage")) { 
-        isPutImage = true;
-      } else if (key.equals("port")) { 
-        remoteport = new Integer(pmap.get("port")[0]).intValue();
-      } else if (key.equals("machine")) { 
-        machineName = pmap.get("machine")[0];
-      } else if (key.equals("token")) { 
-        token = new CheckpointSignature(pmap.get("token")[0]);
-      } else if (key.equals("newChecksum")) {
-        newChecksum = new MD5Hash(pmap.get("newChecksum")[0]);
-      }
-    }
+  private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
 
-    int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
-    if ((numGets > 1) || (numGets == 0) && !isPutImage) {
-      throw new IOException("Illegal parameters to TransferFsImage");
+  static MD5Hash downloadImageToStorage(
+      String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
+      throws IOException {
+    String fileid = GetImageServlet.getParamStringForImage(
+        imageTxId, dstStorage);
+    String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
+    
+    List<File> dstFiles = dstStorage.getFiles(
+        NameNodeDirType.IMAGE, fileName);
+    if (dstFiles.isEmpty()) {
+      throw new IOException("No targets in destination storage!");
     }
+    
+    MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
+    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
+        dstFiles.get(0).length() + " bytes.");
+    return hash;
   }
+  
+  static void downloadEditsToStorage(String fsName, RemoteEditLog log,
+      NNStorage dstStorage) throws IOException {
+    String fileid = GetImageServlet.getParamStringForLog(
+        log, dstStorage);
+    String fileName = NNStorage.getFinalizedEditsFileName(
+        log.getStartTxId(), log.getEndTxId());
 
-  boolean getEdit() {
-    return isGetEdit;
-  }
-
-  boolean getImage() {
-    return isGetImage;
-  }
-
-  boolean putImage() {
-    return isPutImage;
-  }
+    List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.EDITS, fileName);
+    assert !dstFiles.isEmpty() : "No checkpoint targets.";
+    
+    for (File f : dstFiles) {
+      if (f.exists() && f.canRead()) {
+        LOG.info("Skipping download of remote edit log " +
+            log + " since it already is stored locally at " + f);
+        return;
+      } else {
+        LOG.debug("Dest file: " + f);
+      }
+    }
 
-  CheckpointSignature getToken() {
-    return token;
+    getFileClient(fsName, fileid, dstFiles, dstStorage, false);
+    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
+        dstFiles.get(0).length() + " bytes.");
   }
-
+ 
   /**
-   * Get the MD5 digest of the new image
-   * @return the MD5 digest of the new image
+   * Requests that the NameNode download an image from this node.
+   *
+   * @param fsName the http address for the remote NN
+   * @param imageListenAddress the host/port where the local node is running an
+   *                           HTTPServer hosting GetImageServlet
+   * @param storage the storage directory to transfer the image from
+   * @param txid the transaction ID of the image to be uploaded
    */
-  MD5Hash getNewChecksum() {
-    return newChecksum;
-  }
-  
-  String getInfoServer() throws IOException{
-    if (machineName == null || remoteport == 0) {
-      throw new IOException ("MachineName and port undefined");
-    }
-    return machineName + ":" + remoteport;
+  static void uploadImageFromStorage(String fsName,
+      InetSocketAddress imageListenAddress,
+      NNStorage storage, long txid) throws IOException {
+    
+    String fileid = GetImageServlet.getParamStringToPutImage(
+        txid, imageListenAddress, storage);
+    // this doesn't directly upload an image, but rather asks the NN
+    // to connect back to the 2NN to download the specified image.
+    TransferFsImage.getFileClient(fsName, fileid, null, null, false);
+    LOG.info("Uploaded image with txid " + txid + " to namenode at " +
+    		fsName);
   }
 
+  
   /**
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
@@ -156,6 +148,13 @@ class TransferFsImage implements FSConst
         if (num <= 0) {
           break;
         }
+
+        if (ErrorSimulator.getErrorSimulation(4)) {
+          // Simulate a corrupted byte on the wire
+          LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
+          buf[0]++;
+        }
+        
         outstream.write(buf, 0, num);
         if (throttler != null) {
           throttler.throttle(num);
@@ -171,16 +170,17 @@ class TransferFsImage implements FSConst
   /**
    * Client-side Method to fetch file from a server
    * Copies the response from the URL to a list of local files.
-   * 
+   * @param dstStorage if an error occurs writing to one of the files,
+   *                   this storage object will be notified. 
    * @Return a digest of the received file if getChecksum is true
    */
-  static MD5Hash getFileClient(String fsName, String id, File[] localPath,
-      boolean getChecksum)
-    throws IOException {
+  static MD5Hash getFileClient(String nnHostPort,
+      String queryString, List<File> localPaths,
+      NNStorage dstStorage, boolean getChecksum) throws IOException {
     byte[] buf = new byte[BUFFER_SIZE];
     String proto = UserGroupInformation.isSecurityEnabled() ? "https://" : "http://";
-    StringBuilder str = new StringBuilder(proto+fsName+"/getimage?");
-    str.append(id);
+    StringBuilder str = new StringBuilder(proto+nnHostPort+"/getimage?");
+    str.append(queryString);
 
     //
     // open connection to remote server
@@ -189,7 +189,15 @@ class TransferFsImage implements FSConst
     
     // Avoid Krb bug with cross-realm hosts
     SecurityUtil.fetchServiceTicket(url);
-    URLConnection connection = url.openConnection();
+    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+    
+    if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+      throw new IOException(
+          "Image transfer servlet at " + url +
+          " failed with status code " + connection.getResponseCode() +
+          "\nResponse message:\n" + connection.getResponseMessage());
+    }
+    
     long advertisedSize;
     String contentLength = connection.getHeaderField(CONTENT_LENGTH);
     if (contentLength != null) {
@@ -198,6 +206,9 @@ class TransferFsImage implements FSConst
       throw new IOException(CONTENT_LENGTH + " header is not provided " +
                             "by the namenode when trying to fetch " + str);
     }
+    
+    MD5Hash advertisedDigest = parseMD5Header(connection);
+
     long received = 0;
     InputStream stream = connection.getInputStream();
     MessageDigest digester = null;
@@ -205,36 +216,47 @@ class TransferFsImage implements FSConst
       digester = MD5Hash.getDigester();
       stream = new DigestInputStream(stream, digester);
     }
-    FileOutputStream[] output = null;
     boolean finishedReceiving = false;
 
+    List<FileOutputStream> outputStreams = Lists.newArrayList();
+
     try {
-      if (localPath != null) {
-        output = new FileOutputStream[localPath.length];
-        for (int i = 0; i < output.length; i++) {
-          output[i] = new FileOutputStream(localPath[i]);
+      if (localPaths != null) {
+        for (File f : localPaths) {
+          try {
+            if (f.exists()) {
+              LOG.warn("Overwriting existing file " + f
+                  + " with file downloaded from " + str);
+            }
+            outputStreams.add(new FileOutputStream(f));
+          } catch (IOException ioe) {
+            LOG.warn("Unable to download file " + f, ioe);
+            dstStorage.reportErrorOnFile(f);
+          }
+        }
+        
+        if (outputStreams.isEmpty()) {
+          throw new IOException(
+              "Unable to download to any storage directory");
         }
       }
+      
       int num = 1;
       while (num > 0) {
         num = stream.read(buf);
-        if (num > 0 && localPath != null) {
+        if (num > 0) {
           received += num;
-          for (int i = 0; i < output.length; i++) {
-            output[i].write(buf, 0, num);
+          for (FileOutputStream fos : outputStreams) {
+            fos.write(buf, 0, num);
           }
         }
       }
       finishedReceiving = true;
     } finally {
       stream.close();
-      if (output != null) {
-        for (int i = 0; i < output.length; i++) {
-          if (output[i] != null) {
-            output[i].getChannel().force(true);
-            output[i].close();
-          }
-        }
+      for (FileOutputStream fos : outputStreams) {
+        fos.getChannel().force(true);
+        fos.close();
       }
       if (finishedReceiving && received != advertisedSize) {
         // only throw this exception if we think we read all of it on our end
@@ -245,6 +267,25 @@ class TransferFsImage implements FSConst
                               advertisedSize);
       }
     }
-    return digester==null ? null : new MD5Hash(digester.digest());
+
+    if (digester != null) {
+      MD5Hash computedDigest = new MD5Hash(digester.digest());
+      
+      if (advertisedDigest != null &&
+          !computedDigest.equals(advertisedDigest)) {
+        throw new IOException("File " + str + " computed digest " +
+            computedDigest + " does not match advertised digest " + 
+            advertisedDigest);
+      }
+      return computedDigest;
+    } else {
+      return null;
+    }    
   }
+
+  private static MD5Hash parseMD5Header(HttpURLConnection connection) {
+    String header = connection.getHeaderField(MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
+  }
+
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java Fri Jul 29 16:28:45 2011
@@ -47,19 +47,16 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceStability.Evolving
 public class CheckpointCommand extends NamenodeCommand {
   private CheckpointSignature cSig;
-  private boolean isImageObsolete;
   private boolean needToReturnImage;
 
   public CheckpointCommand() {
-    this(null, false, false);
+    this(null, false);
   }
 
   public CheckpointCommand(CheckpointSignature sig,
-                           boolean isImgObsolete,
                            boolean needToReturnImg) {
     super(NamenodeProtocol.ACT_CHECKPOINT);
     this.cSig = sig;
-    this.isImageObsolete = isImgObsolete;
     this.needToReturnImage = needToReturnImg;
   }
 
@@ -72,16 +69,6 @@ public class CheckpointCommand extends N
   }
 
   /**
-   * Indicates whether current backup image is obsolete, and therefore
-   * need to be discarded?
-   * 
-   * @return true if current image should be discarded.
-   */
-  public boolean isImageObsolete() {
-    return isImageObsolete;
-  }
-
-  /**
    * Indicates whether the new checkpoint image needs to be transfered 
    * back to the name-node after the checkpoint is done.
    * 
@@ -104,7 +91,6 @@ public class CheckpointCommand extends N
   public void write(DataOutput out) throws IOException {
     super.write(out);
     cSig.write(out);
-    out.writeBoolean(isImageObsolete);
     out.writeBoolean(needToReturnImage);
   }
   
@@ -112,7 +98,6 @@ public class CheckpointCommand extends N
     super.readFields(in);
     cSig = new CheckpointSignature();
     cSig.readFields(in);
-    isImageObsolete = in.readBoolean();
     needToReturnImage = in.readBoolean();
   }
 }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to journal edits to a remote node. Currently,
+ * this is used to publish edits from the NameNode to a BackupNode.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface JournalProtocol extends VersionedProtocol {
+  public static final long versionID = 1L;
+
+  /**
+   * Journal edit records.
+   * This message is sent by the active name-node to the backup node
+   * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
+   * changes with the backup namespace image.
+   * 
+   * @param registration active node registration
+   * @param firstTxnId the first transaction of this batch
+   * @param numTxns number of transactions
+   * @param records byte array containing serialized journal records
+   */
+  public void journal(NamenodeRegistration registration,
+                      long firstTxnId,
+                      int numTxns,
+                      byte[] records) throws IOException;
+
+  /**
+   * Notify the BackupNode that the NameNode has rolled its edit logs
+   * and is now writing a new log segment.
+   * @param registration the registration of the active NameNode
+   * @param txid the first txid in the new log
+   */
+  public void startLogSegment(NamenodeRegistration registration,
+      long txid) throws IOException;
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java Fri Jul 29 16:28:45 2011
@@ -42,21 +42,14 @@ public interface NamenodeProtocol extend
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
    * 
-   * 5: Added one parameter to rollFSImage() and
-   *    changed the definition of CheckpointSignature
+   * 6: Switch to txid-based file naming for image and edits
    */
-  public static final long versionID = 5L;
+  public static final long versionID = 6L;
 
   // Error codes passed by errorReport().
   final static int NOTIFY = 0;
   final static int FATAL = 1;
 
-  // Journal action codes. See journal().
-  public static byte JA_IS_ALIVE = 100; // check whether the journal is alive
-  public static byte JA_JOURNAL      = 101; // just journal
-  public static byte JA_JSPOOL_START = 102;  // = FSEditLogOpCodes.OP_JSPOOL_START
-  public static byte JA_CHECKPOINT_TIME = 103; // = FSEditLogOpCodes.OP_CHECKPOINT_TIME
-
   public final static int ACT_UNKNOWN = 0;    // unknown action   
   public final static int ACT_SHUTDOWN = 50;   // shutdown node
   public final static int ACT_CHECKPOINT = 51;   // do checkpoint
@@ -84,14 +77,11 @@ public interface NamenodeProtocol extend
   public ExportedBlockKeys getBlockKeys() throws IOException;
 
   /**
-   * Get the size of the current edit log (in bytes).
-   * @return The number of bytes in the current edit log.
+   * @return The most recent transaction ID that has been synced to
+   * persistent storage.
    * @throws IOException
-   * @deprecated 
-   *    See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode}
    */
-  @Deprecated
-  public long getEditLogSize() throws IOException;
+  public long getTransactionID() throws IOException;
 
   /**
    * Closes the current edit log and opens a new one. The 
@@ -105,20 +95,6 @@ public interface NamenodeProtocol extend
   public CheckpointSignature rollEditLog() throws IOException;
 
   /**
-   * Rolls the fsImage log. It removes the old fsImage, copies the
-   * new image to fsImage, removes the old edits and renames edits.new 
-   * to edits. The call fails if any of the four files are missing.
-   * 
-   * @param sig the signature of this checkpoint (old fsimage)
-   * @throws IOException
-   * @deprecated 
-   *    See {@link org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode}
-   */
-  @Deprecated
-  public void rollFsImage(CheckpointSignature sig)
-  throws IOException;
-
-  /**
    * Request name-node version and storage information.
    * 
    * @return {@link NamespaceInfo} identifying versions and storage information 
@@ -177,31 +153,14 @@ public interface NamenodeProtocol extend
    */
   public void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException;
-
+  
+  
   /**
-   * Get the size of the active name-node journal (edit log) in bytes.
-   * 
-   * @param registration the requesting node
-   * @return The number of bytes in the journal.
-   * @throws IOException
-   */
-  public long journalSize(NamenodeRegistration registration) throws IOException;
-
-  /**
-   * Journal edit records.
-   * This message is sent by the active name-node to the backup node
-   * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
-   * changes with the backup namespace image.
-   * 
-   * @param registration active node registration
-   * @param jAction journal action
-   * @param length length of the byte array
-   * @param records byte array containing serialized journal records
-   * @throws IOException
+   * Return a structure containing details about all edit logs
+   * available to be fetched from the NameNode.
+   * @param sinceTxId return only logs that contain transactions >= sinceTxId
    */
-  public void journal(NamenodeRegistration registration,
-                      int jAction,
-                      int length,
-                      byte[] records) throws IOException;
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+    throws IOException;
 }
 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java Fri Jul 29 16:28:45 2011
@@ -43,7 +43,6 @@ implements NodeRegistration {
   String rpcAddress;          // RPC address of the node
   String httpAddress;         // HTTP address of the node
   NamenodeRole role;          // node role
-  long checkpointTime = -1L;  // the age of the image
 
   public NamenodeRegistration() {
     super();
@@ -52,14 +51,12 @@ implements NodeRegistration {
   public NamenodeRegistration(String address,
                               String httpAddress,
                               StorageInfo storageInfo,
-                              NamenodeRole role,
-                              long checkpointTime) {
+                              NamenodeRole role) {
     super();
     this.rpcAddress = address;
     this.httpAddress = httpAddress;
     this.setStorageInfo(storageInfo);
     this.role = role;
-    this.checkpointTime = checkpointTime;
   }
 
   @Override // NodeRegistration
@@ -96,13 +93,6 @@ implements NodeRegistration {
     return role.equals(that);
   }
 
-  /**
-   * Get the age of the image.
-   */
-  public long getCheckpointTime() {
-    return checkpointTime;
-  }
-
   /////////////////////////////////////////////////
   // Writable
   /////////////////////////////////////////////////
@@ -120,7 +110,6 @@ implements NodeRegistration {
     Text.writeString(out, httpAddress);
     Text.writeString(out, role.name());
     super.write(out);
-    out.writeLong(checkpointTime);
   }
 
   @Override // Writable
@@ -129,6 +118,5 @@ implements NodeRegistration {
     httpAddress = Text.readString(in);
     role = NamenodeRole.valueOf(Text.readString(in));
     super.readFields(in);
-    checkpointTime = in.readLong();
   }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java Fri Jul 29 16:28:45 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -98,4 +99,26 @@ public class NamespaceInfo extends Stora
   public String toString(){
     return super.toString() + ";bpid=" + blockPoolID;
   }
+
+  public void validateStorage(NNStorage storage) throws IOException {
+    if (layoutVersion != storage.getLayoutVersion() ||
+        namespaceID != storage.getNamespaceID() ||
+        cTime != storage.cTime ||
+        !clusterID.equals(storage.getClusterID()) ||
+        !blockPoolID.equals(storage.getBlockPoolID())) {
+      throw new IOException("Inconsistent namespace information:\n" +
+          "NamespaceInfo has:\n" +
+          "LV=" + layoutVersion + ";" +
+          "NS=" + namespaceID + ";" +
+          "cTime=" + cTime + ";" +
+          "CID=" + clusterID + ";" +
+          "BPID=" + blockPoolID +
+          ".\nStorage has:\n" +
+          "LV=" + storage.getLayoutVersion() + ";" +
+          "NS=" + storage.getNamespaceID() + ";" +
+          "cTime=" + storage.getCTime() + ";" +
+          "CID=" + storage.getClusterID() + ";" +
+          "BPID=" + storage.getBlockPoolID() + ".");
+    }
+  }
 }

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.Writable;
+
+public class RemoteEditLog implements Writable {
+  private long startTxId = FSConstants.INVALID_TXID;
+  private long endTxId = FSConstants.INVALID_TXID;
+  
+  public RemoteEditLog() {
+  }
+
+  public RemoteEditLog(long startTxId, long endTxId) {
+    this.startTxId = startTxId;
+    this.endTxId = endTxId;
+  }
+
+  public long getStartTxId() {
+    return startTxId;
+  }
+
+  public long getEndTxId() {
+    return endTxId;
+  }
+    
+  @Override
+  public String toString() {
+    return "[" + startTxId + "," + endTxId + "]";
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(startTxId);
+    out.writeLong(endTxId);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    startTxId = in.readLong();
+    endTxId = in.readLong();
+  }
+
+}

Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * An enumeration of logs available on a remote NameNode.
+ */
+public class RemoteEditLogManifest implements Writable {
+
+  private List<RemoteEditLog> logs;
+  
+  public RemoteEditLogManifest() {
+  }
+  
+  public RemoteEditLogManifest(List<RemoteEditLog> logs) {
+    this.logs = logs;
+    checkState();
+  }
+  
+  
+  /**
+   * Check that the logs are contiguous and non-overlapping
+   * sequences of transactions, in sorted order
+   * @throws IllegalStateException if incorrect
+   */
+  private void checkState()  {
+    Preconditions.checkNotNull(logs);
+    
+    RemoteEditLog prev = null;
+    for (RemoteEditLog log : logs) {
+      if (prev != null) {
+        if (log.getStartTxId() != prev.getEndTxId() + 1) {
+          throw new IllegalStateException("Invalid log manifest:" + this);
+        }
+      }
+      
+      prev = log;
+    }
+  }
+  
+  public List<RemoteEditLog> getLogs() {
+    return Collections.unmodifiableList(logs);
+  }
+
+
+  
+  @Override
+  public String toString() {
+    return "[" + Joiner.on(", ").join(logs) + "]";
+  }
+  
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(logs.size());
+    for (RemoteEditLog log : logs) {
+      log.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numLogs = in.readInt();
+    logs = Lists.newArrayList();
+    for (int i = 0; i < numLogs; i++) {
+      RemoteEditLog log = new RemoteEditLog();
+      log.readFields(in);
+      logs.add(log);
+    }
+    checkState();
+  }
+}

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java Fri Jul 29 16:28:45 2011
@@ -31,6 +31,7 @@ public enum EditsElement {
   EDITS_VERSION,
   RECORD,
   OPCODE,
+  TRANSACTION_ID,
   DATA,
     // elements in the data part of the editLog records
     LENGTH,

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Fri Jul 29 16:28:45 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
 
+import java.io.EOFException;
 import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -40,7 +41,7 @@ import static org.apache.hadoop.hdfs.too
 class EditsLoaderCurrent implements EditsLoader {
 
   private static int[] supportedVersions = { -18, -19, -20, -21, -22, -23, -24,
-      -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36 };
+      -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38};
 
   private EditsVisitor v;
   private int editsVersion = 0;
@@ -65,10 +66,18 @@ class EditsLoaderCurrent implements Edit
   }
 
   /**
+   * Visit a transaction ID, if the log version supports it.
+   */
+  private void visitTxId() throws IOException {
+    if (LayoutVersion.supports(Feature.STORED_TXIDS, editsVersion)) {
+      v.visitLong(EditsElement.TRANSACTION_ID);
+    }
+  }
+  
+  /**
    * Visit OP_INVALID
    */
   private void visit_OP_INVALID() throws IOException {
-    ; // nothing to do, this op code has no data
   }
 
   /**
@@ -92,6 +101,7 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_ADD_or_OP_CLOSE(FSEditLogOpCodes editsOpCode)
     throws IOException {
+    visitTxId();
 
     IntToken opAddLength = v.visitInt(EditsElement.LENGTH);
     // this happens if the edits is not properly ended (-1 op code),
@@ -135,6 +145,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_RENAME_OLD
    */
   private void visit_OP_RENAME_OLD() throws IOException {
+    visitTxId();
+
     v.visitInt(        EditsElement.LENGTH);
     v.visitStringUTF8( EditsElement.SOURCE);
     v.visitStringUTF8( EditsElement.DESTINATION);
@@ -145,6 +157,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_DELETE
    */
   private void visit_OP_DELETE() throws IOException {
+    visitTxId();
+
     v.visitInt(        EditsElement.LENGTH);
     v.visitStringUTF8( EditsElement.PATH);
     v.visitStringUTF8( EditsElement.TIMESTAMP);
@@ -154,6 +168,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_MKDIR
    */
   private void visit_OP_MKDIR() throws IOException {
+    visitTxId();
+
     v.visitInt(        EditsElement.LENGTH);
     v.visitStringUTF8( EditsElement.PATH);
     v.visitStringUTF8( EditsElement.TIMESTAMP);
@@ -172,6 +188,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_REPLICATION
    */
   private void visit_OP_SET_REPLICATION() throws IOException {
+    visitTxId();
+
     v.visitStringUTF8(EditsElement.PATH);
     v.visitStringUTF8(EditsElement.REPLICATION);
   }
@@ -180,6 +198,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_PERMISSIONS
    */
   private void visit_OP_SET_PERMISSIONS() throws IOException {
+    visitTxId();
+
     v.visitStringUTF8( EditsElement.PATH);
     v.visitShort(      EditsElement.FS_PERMISSIONS);
   }
@@ -188,6 +208,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_OWNER
    */
   private void visit_OP_SET_OWNER() throws IOException {
+    visitTxId();
+
     v.visitStringUTF8(EditsElement.PATH);
     v.visitStringUTF8(EditsElement.USERNAME);
     v.visitStringUTF8(EditsElement.GROUPNAME);
@@ -197,6 +219,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_GENSTAMP
    */
   private void visit_OP_SET_GENSTAMP() throws IOException {
+    visitTxId();
+
     v.visitLong(EditsElement.GENERATION_STAMP);
   }
 
@@ -204,6 +228,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_TIMES
    */
   private void visit_OP_TIMES() throws IOException {
+    visitTxId();
+
     v.visitInt(        EditsElement.LENGTH);
     v.visitStringUTF8( EditsElement.PATH);
     v.visitStringUTF8( EditsElement.MTIME);
@@ -214,6 +240,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_QUOTA
    */
   private void visit_OP_SET_QUOTA() throws IOException {
+    visitTxId();
+
     v.visitStringUTF8( EditsElement.PATH);
     v.visitLong(       EditsElement.NS_QUOTA);
     v.visitLong(       EditsElement.DS_QUOTA);
@@ -223,6 +251,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_RENAME
    */
   private void visit_OP_RENAME() throws IOException {
+    visitTxId();
+
     v.visitInt(           EditsElement.LENGTH);
     v.visitStringUTF8(    EditsElement.SOURCE);
     v.visitStringUTF8(    EditsElement.DESTINATION);
@@ -234,6 +264,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_CONCAT_DELETE
    */
   private void visit_OP_CONCAT_DELETE() throws IOException {
+    visitTxId();
+
     IntToken lengthToken = v.visitInt(EditsElement.LENGTH);
     v.visitStringUTF8(EditsElement.CONCAT_TARGET);
     // all except of CONCAT_TARGET and TIMESTAMP
@@ -248,6 +280,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SYMLINK
    */
   private void visit_OP_SYMLINK() throws IOException {
+    visitTxId();
+
     v.visitInt(        EditsElement.LENGTH);
     v.visitStringUTF8( EditsElement.SOURCE);
     v.visitStringUTF8( EditsElement.DESTINATION);
@@ -267,6 +301,8 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_GET_DELEGATION_TOKEN
    */
   private void visit_OP_GET_DELEGATION_TOKEN() throws IOException {
+    visitTxId();
+    
       v.visitByte(       EditsElement.T_VERSION);
       v.visitStringText( EditsElement.T_OWNER);
       v.visitStringText( EditsElement.T_RENEWER);
@@ -283,6 +319,8 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_RENEW_DELEGATION_TOKEN()
     throws IOException {
+    visitTxId();
+
       v.visitByte(       EditsElement.T_VERSION);
       v.visitStringText( EditsElement.T_OWNER);
       v.visitStringText( EditsElement.T_RENEWER);
@@ -299,6 +337,8 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_CANCEL_DELEGATION_TOKEN()
     throws IOException {
+    visitTxId();
+
       v.visitByte(       EditsElement.T_VERSION);
       v.visitStringText( EditsElement.T_OWNER);
       v.visitStringText( EditsElement.T_RENEWER);
@@ -314,6 +354,8 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_UPDATE_MASTER_KEY()
     throws IOException {
+    visitTxId();
+    
       v.visitVInt(  EditsElement.KEY_ID);
       v.visitVLong( EditsElement.KEY_EXPIRY_DATE);
       VIntToken blobLengthToken = v.visitVInt(EditsElement.KEY_LENGTH);
@@ -322,11 +364,29 @@ class EditsLoaderCurrent implements Edit
   
   private void visit_OP_REASSIGN_LEASE()
     throws IOException {
+    visitTxId();
+
       v.visitStringUTF8(EditsElement.CLIENT_NAME);
       v.visitStringUTF8(EditsElement.PATH);
       v.visitStringUTF8(EditsElement.CLIENT_NAME);
   }
 
+  /**
+   * Visit OP_BEGIN_LOG_SEGMENT
+   */
+  private void visit_OP_BEGIN_LOG_SEGMENT()
+    throws IOException {
+    visitTxId();
+  }
+  
+  /**
+   * Visit OP_END_LOG_SEGMENT
+   */
+  private void visit_OP_END_LOG_SEGMENT()
+    throws IOException {
+    visitTxId();
+  }
+
   private void visitOpCode(FSEditLogOpCodes editsOpCode)
     throws IOException {
 
@@ -391,6 +451,12 @@ class EditsLoaderCurrent implements Edit
       case OP_REASSIGN_LEASE: // 22
         visit_OP_REASSIGN_LEASE();
         break;
+      case OP_END_LOG_SEGMENT: // 23
+        visit_OP_END_LOG_SEGMENT();
+        break;        
+      case OP_START_LOG_SEGMENT: // 24
+        visit_OP_BEGIN_LOG_SEGMENT();
+        break;
       default:
       {
         throw new IOException("Unknown op code " + editsOpCode);
@@ -419,7 +485,17 @@ class EditsLoaderCurrent implements Edit
       do {
         v.visitEnclosingElement(EditsElement.RECORD);
 
-        ByteToken opCodeToken = v.visitByte(EditsElement.OPCODE);
+        ByteToken opCodeToken;
+        try {
+          opCodeToken = v.visitByte(EditsElement.OPCODE);
+        } catch (EOFException eof) {
+          // Getting EOF when reading the opcode is fine --
+          // it's just a finalized edits file
+          // Just fake the OP_INVALID here.
+          opCodeToken = new ByteToken(EditsElement.OPCODE);
+          opCodeToken.fromByte(FSEditLogOpCodes.OP_INVALID.getOpCode());
+          v.visit(opCodeToken);
+        }
         editsOpCode = FSEditLogOpCodes.fromByte(opCodeToken.value);
 
         v.visitEnclosingElement(EditsElement.DATA);

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsViewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsViewer.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsViewer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsViewer.java Fri Jul 29 16:28:45 2011
@@ -17,18 +17,12 @@
  */
 package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/Tokenizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/Tokenizer.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/Tokenizer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/Tokenizer.java Fri Jul 29 16:28:45 2011
@@ -104,6 +104,10 @@ interface Tokenizer {
     public void fromBinary(DataInputStream in) throws IOException {
       value = in.readByte();
     }
+    
+    public void fromByte(byte b) {
+      value = b;
+    }
 
     @Override
     public String toString() {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Jul 29 16:28:45 2011
@@ -122,7 +122,7 @@ class ImageLoaderCurrent implements Imag
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36 };
+      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38};
   private int imageVersion = 0;
 
   /* (non-Javadoc)
@@ -157,6 +157,10 @@ class ImageLoaderCurrent implements Imag
 
       v.visit(ImageElement.GENERATION_STAMP, in.readLong());
 
+      if (LayoutVersion.supports(Feature.STORED_TXIDS, imageVersion)) {
+        v.visit(ImageElement.TRANSACTION_ID, in.readLong());
+      }
+
       if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imageVersion)) {
         boolean isCompressed = in.readBoolean();
         v.visit(ImageElement.IS_COMPRESSED, imageVersion);

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java Fri Jul 29 16:28:45 2011
@@ -71,7 +71,8 @@ abstract class ImageVisitor {
     NUM_DELEGATION_TOKENS,
     DELEGATION_TOKENS,
     DELEGATION_TOKEN_IDENTIFIER,
-    DELEGATION_TOKEN_EXPIRY_TIME
+    DELEGATION_TOKEN_EXPIRY_TIME,
+    TRANSACTION_ID
   }
   
   /**



Mime
View raw message