hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1576130 [4/7] - in /hadoop/common/branches/branch-2.4/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hado...
Date Mon, 10 Mar 2014 23:40:24 GMT
Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Mar 10 23:40:21 2014
@@ -17,28 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
-import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -46,37 +24,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
-import org.apache.hadoop.hdfs.server.datanode.Replica;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -90,6 +43,15 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.Executor;
+
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
  * has a unique name and an extent on disk.
@@ -193,6 +155,7 @@ class FsDatasetImpl implements FsDataset
   }
     
   final DataNode datanode;
+  final DataStorage dataStorage;
   final FsVolumeList volumes;
   final FsDatasetAsyncDiskService asyncDiskService;
   final FsDatasetCache cacheManager;
@@ -209,6 +172,7 @@ class FsDatasetImpl implements FsDataset
   FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
       ) throws IOException {
     this.datanode = datanode;
+    this.dataStorage = storage;
     // The number of volumes required for operation is the total number 
     // of volumes minus the number of failed volumes we can tolerate.
     final int volFailuresTolerated =
@@ -1234,7 +1198,8 @@ class FsDatasetImpl implements FsDataset
       // finishes.
       asyncDiskService.deleteAsync(v, f,
           FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
-          new ExtendedBlock(bpid, invalidBlks[i]));
+          new ExtendedBlock(bpid, invalidBlks[i]),
+          dataStorage.getTrashDirectoryForBlockFile(bpid, f));
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
@@ -1762,11 +1727,13 @@ class FsDatasetImpl implements FsDataset
   }
   
   @Override
-  public synchronized void addBlockPool(String bpid, Configuration conf)
+  public void addBlockPool(String bpid, Configuration conf)
       throws IOException {
     LOG.info("Adding block pool " + bpid);
-    volumes.addBlockPool(bpid, conf);
-    volumeMap.initBlockPool(bpid);
+    synchronized(this) {
+      volumes.addBlockPool(bpid, conf);
+      volumeMap.initBlockPool(bpid);
+    }
     volumes.getAllVolumesMap(bpid, volumeMap);
   }
 
@@ -1896,6 +1863,21 @@ class FsDatasetImpl implements FsDataset
   }
 
   @Override
+  public void enableTrash(String bpid) {
+    dataStorage.enableTrash(bpid);
+  }
+
+  @Override
+  public void restoreTrash(String bpid) {
+    dataStorage.restoreTrash(bpid);
+  }
+
+  @Override
+  public boolean trashEnabled(String bpid) {
+    return dataStorage.trashEnabled(bpid);
+  }
+
+  @Override
   public RollingLogs createRollingLogs(String bpid, String prefix
       ) throws IOException {
     String dir = null;

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Mon Mar 10 23:40:21 2014
@@ -96,10 +96,41 @@ class FsVolumeList {
     }
   }
   
-  void getAllVolumesMap(String bpid, ReplicaMap volumeMap) throws IOException {
+  void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
     long totalStartTime = System.currentTimeMillis();
-    for (FsVolumeImpl v : volumes) {
-      getVolumeMap(bpid, v, volumeMap);
+    final List<IOException> exceptions = Collections.synchronizedList(
+        new ArrayList<IOException>());
+    List<Thread> replicaAddingThreads = new ArrayList<Thread>();
+    for (final FsVolumeImpl v : volumes) {
+      Thread t = new Thread() {
+        public void run() {
+          try {
+            FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
+                bpid + " on volume " + v + "...");
+            long startTime = System.currentTimeMillis();
+            v.getVolumeMap(bpid, volumeMap);
+            long timeTaken = System.currentTimeMillis() - startTime;
+            FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+                + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
+          } catch (IOException ioe) {
+            FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
+                "from " + v + ". Will throw later.", ioe);
+            exceptions.add(ioe);
+          }
+        }
+      };
+      replicaAddingThreads.add(t);
+      t.start();
+    }
+    for (Thread t : replicaAddingThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      throw exceptions.get(0);
     }
     long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
     FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
@@ -219,4 +250,4 @@ class FsVolumeList {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Mon Mar 10 23:40:21 2014
@@ -221,7 +221,7 @@ public class BackupImage extends FSImage
       backupInputStream.setBytes(data, logVersion);
 
       long numTxnsAdvanced = logLoader.loadEditRecords(
-          backupInputStream, true, lastAppliedTxId + 1, null);
+          backupInputStream, true, lastAppliedTxId + 1, null, null);
       if (numTxnsAdvanced != numTxns) {
         throw new IOException("Batch of txns starting at txnid " +
             firstTxId + " was supposed to contain " + numTxns +
@@ -279,7 +279,7 @@ public class BackupImage extends FSImage
           editStreams.add(s);
         }
       }
-      loadEdits(editStreams, namesystem, null);
+      loadEdits(editStreams, namesystem);
     }
     
     // now, need to load the in-progress file
@@ -315,7 +315,7 @@ public class BackupImage extends FSImage
         
         FSEditLogLoader loader =
             new FSEditLogLoader(namesystem, lastAppliedTxId);
-        loader.loadFSEdits(stream, lastAppliedTxId + 1, null);
+        loader.loadFSEdits(stream, lastAppliedTxId + 1);
         lastAppliedTxId = loader.getLastAppliedTxId();
         assert lastAppliedTxId == getEditLog().getLastWrittenTxId();
       } finally {

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Mon Mar 10 23:40:21 2014
@@ -97,4 +97,9 @@ class BackupJournalManager implements Jo
   public String toString() {
     return "BackupJournalManager";
   }
+
+  @Override
+  public void discardSegments(long startTxId) throws IOException {
+    throw new UnsupportedOperationException();
+  }
 }

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Mon Mar 10 23:40:21 2014
@@ -30,12 +30,14 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
@@ -44,8 +46,8 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -357,7 +359,7 @@ public class BackupNode extends NameNode
     } else {
       nsInfo.validateStorage(storage);
     }
-    bnImage.initEditLog();
+    bnImage.initEditLog(StartupOption.REGULAR);
     setRegistration();
     NamenodeRegistration nnReg = null;
     while(!isStopRequested()) {
@@ -402,9 +404,9 @@ public class BackupNode extends NameNode
       LOG.fatal(errorMsg);
       throw new IOException(errorMsg);
     }
-    assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+    assert HdfsConstants.NAMENODE_LAYOUT_VERSION == nsInfo.getLayoutVersion() :
       "Active and backup node layout versions must be the same. Expected: "
-      + HdfsConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
+      + HdfsConstants.NAMENODE_LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
     return nsInfo;
   }
 

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Mon Mar 10 23:40:21 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 
 import com.google.common.collect.ComparisonChain;
@@ -47,6 +48,7 @@ public class CheckpointSignature extends
   }
 
   CheckpointSignature(String str) {
+    super(NodeType.NAME_NODE);
     String[] fields = str.split(FIELD_SEPARATOR);
     assert fields.length == NUM_FIELDS :
       "Must be " + NUM_FIELDS + " fields in CheckpointSignature";

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Mon Mar 10 23:40:21 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -220,9 +221,9 @@ class Checkpointer extends Daemon {
         LOG.info("Unable to roll forward using only logs. Downloading " +
             "image with txid " + sig.mostRecentCheckpointTxId);
         MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
-            backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId,
-            bnStorage, true);
-        bnImage.saveDigestAndRenameCheckpointImage(
+            backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
+            true);
+        bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
             sig.mostRecentCheckpointTxId, downloadedHash);
         lastApplied = sig.mostRecentCheckpointTxId;
         needReloadImage = true;
@@ -240,7 +241,8 @@ class Checkpointer extends Daemon {
 
       if(needReloadImage) {
         LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
-        File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
+        File file = bnStorage.findImageFile(NameNodeFile.IMAGE,
+            sig.mostRecentCheckpointTxId);
         bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
       }
       rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
@@ -263,7 +265,7 @@ class Checkpointer extends Daemon {
     if(cpCmd.needToReturnImage()) {
       TransferFsImage.uploadImageFromStorage(
           backupNode.nnHttpAddress, getImageListenAddress(),
-          bnStorage, txid);
+          bnStorage, NameNodeFile.IMAGE, txid);
     }
 
     getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);
@@ -307,6 +309,6 @@ class Checkpointer extends Daemon {
     }
     LOG.info("Checkpointer about to load edits from " +
         editsStreams.size() + " stream(s).");
-    dstImage.loadEdits(editsStreams, dstNamesystem, null);
+    dstImage.loadEdits(editsStreams, dstNamesystem);
   }
 }

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Mon Mar 10 23:40:21 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -150,7 +149,8 @@ public class EditLogFileInputStream exte
       } catch (EOFException eofe) {
         throw new LogHeaderCorruptException("No header found in log");
       }
-      if (LayoutVersion.supports(Feature.ADD_LAYOUT_FLAGS, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.ADD_LAYOUT_FLAGS, logVersion)) {
         try {
           LayoutFlags.read(dataIn);
         } catch (EOFException eofe) {
@@ -329,12 +329,12 @@ public class EditLogFileInputStream exte
       throw new LogHeaderCorruptException(
           "Reached EOF when reading log header");
     }
-    if (logVersion < HdfsConstants.LAYOUT_VERSION || // future version
+    if (logVersion < HdfsConstants.NAMENODE_LAYOUT_VERSION || // future version
         logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION) { // unsupported
       throw new LogHeaderCorruptException(
           "Unexpected version of the file system log file: "
           + logVersion + ". Current version = "
-          + HdfsConstants.LAYOUT_VERSION + ".");
+          + HdfsConstants.NAMENODE_LAYOUT_VERSION + ".");
     }
     return logVersion;
   }

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Mon Mar 10 23:40:21 2014
@@ -132,7 +132,7 @@ public class EditLogFileOutputStream ext
    */
   @VisibleForTesting
   public static void writeHeader(DataOutputStream out) throws IOException {
-    out.writeInt(HdfsConstants.LAYOUT_VERSION);
+    out.writeInt(HdfsConstants.NAMENODE_LAYOUT_VERSION);
     LayoutFlags.write(out);
   }
 

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Mon Mar 10 23:40:21 2014
@@ -20,36 +20,36 @@ package org.apache.hadoop.hdfs.server.na
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.Time.now;
 
-import java.net.URI;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.lang.reflect.Constructor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@@ -66,8 +66,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -91,7 +92,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -1036,6 +1036,18 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
 
+  void logStartRollingUpgrade(long startTime) {
+    RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get());
+    op.setTime(startTime);
+    logEdit(op);
+  }
+
+  void logFinalizeRollingUpgrade(long finalizeTime) {
+    RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get());
+    op.setTime(finalizeTime);
+    logEdit(op);
+  }
+
   /**
    * Get all the journals this edit log is currently operating on.
    */
@@ -1318,6 +1330,13 @@ public class FSEditLog implements LogsPu
     }
   }
   
+  public synchronized void discardSegments(long markerTxid)
+      throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      jas.getManager().discardSegments(markerTxid);
+    }
+  }
+
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxId, boolean inProgressOk) throws IOException {

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Mar 10 23:40:21 2014
@@ -37,10 +37,11 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetNSQuotaOp;
@@ -81,6 +83,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
@@ -96,22 +99,30 @@ import com.google.common.base.Preconditi
 @InterfaceStability.Evolving
 public class FSEditLogLoader {
   static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
-  static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
+  static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
+
   private final FSNamesystem fsNamesys;
   private long lastAppliedTxId;
+  /** Total number of end transactions loaded. */
+  private int totalEdits = 0;
   
   public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
     this.fsNamesys = fsNamesys;
     this.lastAppliedTxId = lastAppliedTxId;
   }
   
+  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
+      throws IOException {
+    return loadFSEdits(edits, expectedStartingTxId, null, null);
+  }
+
   /**
    * Load an edit log, and apply the changes to the in-memory structure
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
-      MetaRecoveryContext recovery) throws IOException {
+      StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = createStartupProgressStep(edits);
     prog.beginStep(Phase.LOADING_EDITS, step);
@@ -119,8 +130,8 @@ public class FSEditLogLoader {
     try {
       long startTime = now();
       FSImage.LOG.info("Start loading edits file " + edits.getName());
-      long numEdits = loadEditRecords(edits, false, 
-                                 expectedStartingTxId, recovery);
+      long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
+          startOpt, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
           + " loaded in " + (now()-startTime)/1000 + " seconds");
@@ -133,8 +144,8 @@ public class FSEditLogLoader {
   }
 
   long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
-                      long expectedStartingTxId, MetaRecoveryContext recovery)
-      throws IOException {
+      long expectedStartingTxId, StartupOption startOpt,
+      MetaRecoveryContext recovery) throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
 
     EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
@@ -206,12 +217,23 @@ public class FSEditLogLoader {
             }
           }
           try {
-            long inodeId = applyEditLogOp(op, fsDir, in.getVersion(), lastInodeId);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("op=" + op + ", startOpt=" + startOpt
+                  + ", numEdits=" + numEdits + ", totalEdits=" + totalEdits);
+            }
+            long inodeId = applyEditLogOp(op, fsDir, startOpt,
+                in.getVersion(), lastInodeId);
             if (lastInodeId < inodeId) {
               lastInodeId = inodeId;
             }
+          } catch (RollingUpgradeOp.RollbackException e) {
+            throw e;
           } catch (Throwable e) {
             LOG.error("Encountered exception on operation " + op, e);
+            if (recovery == null) {
+              throw e instanceof IOException? (IOException)e: new IOException(e);
+            }
+
             MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
              "apply edit log operation " + op + ": error " +
              e.getMessage(), recovery, "applying edits");
@@ -237,6 +259,10 @@ public class FSEditLogLoader {
             }
           }
           numEdits++;
+          totalEdits++;
+        } catch (RollingUpgradeOp.RollbackException e) {
+          LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
+          break;
         } catch (MetaRecoveryContext.RequestStopException e) {
           MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
               in.getPosition() + "/"  + in.length());
@@ -268,7 +294,8 @@ public class FSEditLogLoader {
     long inodeId = inodeIdFromOp;
 
     if (inodeId == INodeId.GRANDFATHER_INODE_ID) {
-      if (LayoutVersion.supports(Feature.ADD_INODE_ID, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
         throw new IOException("The layout version " + logVersion
             + " supports inodeId but gave bogus inodeId");
       }
@@ -285,7 +312,7 @@ public class FSEditLogLoader {
 
   @SuppressWarnings("deprecation")
   private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
-      int logVersion, long lastInodeId) throws IOException {
+      StartupOption startOpt, int logVersion, long lastInodeId) throws IOException {
     long inodeId = INodeId.GRANDFATHER_INODE_ID;
     if (LOG.isTraceEnabled()) {
       LOG.trace("replaying edit log: " + op);
@@ -693,6 +720,30 @@ public class FSEditLogLoader {
       fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
       break;
     }
+    case OP_ROLLING_UPGRADE_START: {
+      if (startOpt == StartupOption.ROLLINGUPGRADE) {
+        final RollingUpgradeStartupOption rollingUpgradeOpt
+            = startOpt.getRollingUpgradeStartupOption(); 
+        if (rollingUpgradeOpt == RollingUpgradeStartupOption.ROLLBACK) {
+          throw new RollingUpgradeOp.RollbackException();
+        } else if (rollingUpgradeOpt == RollingUpgradeStartupOption.DOWNGRADE) {
+          //ignore upgrade marker
+          break;
+        }
+      }
+      // start rolling upgrade
+      final long startTime = ((RollingUpgradeOp) op).getTime();
+      fsNamesys.startRollingUpgradeInternal(startTime);
+      fsNamesys.triggerRollbackCheckpoint();
+      break;
+    }
+    case OP_ROLLING_UPGRADE_FINALIZE: {
+      final long finalizeTime = ((RollingUpgradeOp) op).getTime();
+      fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
+      fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
+          NameNodeFile.IMAGE);
+      break;
+    }
     case OP_ADD_CACHE_DIRECTIVE: {
       AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
       CacheDirectiveInfo result = fsNamesys.
@@ -931,7 +982,7 @@ public class FSEditLogLoader {
     // The editlog must be emptied by restarting the namenode, before proceeding
     // with the upgrade.
     if (Storage.is203LayoutVersion(logVersion)
-        && logVersion != HdfsConstants.LAYOUT_VERSION) {
+        && logVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
       String msg = "During upgrade failed to load the editlog version "
           + logVersion + " from release 0.20.203. Please go back to the old "
           + " release and restart the namenode. This empties the editlog "

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Mar 10 23:40:21 2014
@@ -45,6 +45,8 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_ACL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_FINALIZE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_START;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
@@ -179,7 +181,12 @@ public abstract class FSEditLogOp {
       inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
       inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
       inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
+
       inst.put(OP_SET_ACL, new SetAclOp());
+      inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp(
+          OP_ROLLING_UPGRADE_START, "start"));
+      inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
+          OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -267,8 +274,8 @@ public abstract class FSEditLogOp {
   
   void readRpcIds(DataInputStream in, int logVersion)
       throws IOException {
-    if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE,
-        logVersion)) {
+    if (NameNodeLayoutVersion.supports(
+        LayoutVersion.Feature.EDITLOG_SUPPORT_RETRYCACHE, logVersion)) {
       this.rpcClientId = FSImageSerialization.readBytes(in);
       this.rpcCallId = FSImageSerialization.readInt(in);
     }
@@ -315,7 +322,7 @@ public abstract class FSEditLogOp {
 
     private static List<AclEntry> read(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EXTENDED_ACL, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(Feature.EXTENDED_ACL, logVersion)) {
         return null;
       }
 
@@ -480,18 +487,20 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
       }
-      if (LayoutVersion.supports(Feature.ADD_INODE_ID, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
         this.inodeId = in.readLong();
       } else {
         // The inodeId should be updated when this editLogOp is applied
         this.inodeId = INodeId.GRANDFATHER_INODE_ID;
       }
       if ((-17 < logVersion && length != 4) ||
-          (logVersion <= -17 && length != 5 && !LayoutVersion.supports(
-              Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {
+          (logVersion <= -17 && length != 5 && !NameNodeLayoutVersion.supports(
+              LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {
         throw new IOException("Incorrect data format."  +
                               " logVersion is " + logVersion +
                               " but writables.length is " +
@@ -499,7 +508,8 @@ public abstract class FSEditLogOp {
       }
       this.path = FSImageSerialization.readString(in);
 
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.replication = FSImageSerialization.readShort(in);
         this.mtime = FSImageSerialization.readLong(in);
       } else {
@@ -507,8 +517,10 @@ public abstract class FSEditLogOp {
         this.mtime = readLong(in);
       }
 
-      if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-        if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {
+        if (NameNodeLayoutVersion.supports(
+            LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
           this.atime = FSImageSerialization.readLong(in);
         } else {
           this.atime = readLong(in);
@@ -517,7 +529,8 @@ public abstract class FSEditLogOp {
         this.atime = 0;
       }
 
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.blockSize = FSImageSerialization.readLong(in);
       } else {
         this.blockSize = readLong(in);
@@ -933,7 +946,8 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
       this.path = FSImageSerialization.readString(in);
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.replication = FSImageSerialization.readShort(in);
       } else {
         this.replication = readShort(in);
@@ -1024,7 +1038,8 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
         if (length < 3) { // trg, srcs.., timestamp
           throw new IOException("Incorrect data format " +
@@ -1033,7 +1048,8 @@ public abstract class FSEditLogOp {
       }
       this.trg = FSImageSerialization.readString(in);
       int srcSize = 0;
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         srcSize = in.readInt();
       } else {
         srcSize = this.length - 1 - 1; // trg and timestamp
@@ -1052,7 +1068,8 @@ public abstract class FSEditLogOp {
         srcs[i]= FSImageSerialization.readString(in);
       }
       
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.timestamp = FSImageSerialization.readLong(in);
       } else {
         this.timestamp = readLong(in);
@@ -1158,7 +1175,8 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
         if (this.length != 3) {
           throw new IOException("Incorrect data format. "
@@ -1167,7 +1185,8 @@ public abstract class FSEditLogOp {
       }
       this.src = FSImageSerialization.readString(in);
       this.dst = FSImageSerialization.readString(in);
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.timestamp = FSImageSerialization.readLong(in);
       } else {
         this.timestamp = readLong(in);
@@ -1254,14 +1273,16 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
         if (this.length != 2) {
           throw new IOException("Incorrect data format. " + "delete operation.");
         }
       }
       this.path = FSImageSerialization.readString(in);
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.timestamp = FSImageSerialization.readLong(in);
       } else {
         this.timestamp = readLong(in);
@@ -1362,22 +1383,26 @@ public abstract class FSEditLogOp {
     
     @Override
     void readFields(DataInputStream in, int logVersion) throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
       }
       if (-17 < logVersion && length != 2 ||
           logVersion <= -17 && length != 3
-          && !LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+          && !NameNodeLayoutVersion.supports(
+              LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         throw new IOException("Incorrect data format. Mkdir operation.");
       }
-      if (LayoutVersion.supports(Feature.ADD_INODE_ID, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
         this.inodeId = FSImageSerialization.readLong(in);
       } else {
         // This id should be updated when this editLogOp is applied
         this.inodeId = INodeId.GRANDFATHER_INODE_ID;
       }
       this.path = FSImageSerialization.readString(in);
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.timestamp = FSImageSerialization.readLong(in);
       } else {
         this.timestamp = readLong(in);
@@ -1386,8 +1411,10 @@ public abstract class FSEditLogOp {
       // The disk format stores atimes for directories as well.
       // However, currently this is not being updated/used because of
       // performance reasons.
-      if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-        if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {
+        if (NameNodeLayoutVersion.supports(
+            LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
           FSImageSerialization.readLong(in);
         } else {
           readLong(in);
@@ -1977,7 +2004,8 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
         if (length != 3) {
           throw new IOException("Incorrect data format. " + "times operation.");
@@ -1985,7 +2013,8 @@ public abstract class FSEditLogOp {
       }
       this.path = FSImageSerialization.readString(in);
 
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.mtime = FSImageSerialization.readLong(in);
         this.atime = FSImageSerialization.readLong(in);
       } else {
@@ -2094,14 +2123,16 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
         if (this.length != 4) {
           throw new IOException("Incorrect data format. "
               + "symlink operation.");
         }
       }
-      if (LayoutVersion.supports(Feature.ADD_INODE_ID, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
         this.inodeId = FSImageSerialization.readLong(in);
       } else {
         // This id should be updated when the editLogOp is applied
@@ -2110,7 +2141,8 @@ public abstract class FSEditLogOp {
       this.path = FSImageSerialization.readString(in);
       this.value = FSImageSerialization.readString(in);
 
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.mtime = FSImageSerialization.readLong(in);
         this.atime = FSImageSerialization.readLong(in);
       } else {
@@ -2228,7 +2260,8 @@ public abstract class FSEditLogOp {
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      if (!LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (!NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.length = in.readInt();
         if (this.length != 3) {
           throw new IOException("Incorrect data format. " + "Rename operation.");
@@ -2237,7 +2270,8 @@ public abstract class FSEditLogOp {
       this.src = FSImageSerialization.readString(in);
       this.dst = FSImageSerialization.readString(in);
 
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.timestamp = FSImageSerialization.readLong(in);
       } else {
         this.timestamp = readLong(in);
@@ -2448,7 +2482,8 @@ public abstract class FSEditLogOp {
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.expiryTime = FSImageSerialization.readLong(in);
       } else {
         this.expiryTime = readLong(in);
@@ -2520,7 +2555,8 @@ public abstract class FSEditLogOp {
         throws IOException {
       this.token = new DelegationTokenIdentifier();
       this.token.readFields(in);
-      if (LayoutVersion.supports(Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
         this.expiryTime = FSImageSerialization.readLong(in);
       } else {
         this.expiryTime = readLong(in);
@@ -3544,6 +3580,65 @@ public abstract class FSEditLogOp {
       this.len = in.readLong();
     }
   }
+  /**
+   * Operation corresponding to upgrade
+   */
+  static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
+    private final String name;
+    private long time;
+
+    public RollingUpgradeOp(FSEditLogOpCodes code, String name) {
+      super(code);
+      this.name = name.toUpperCase();
+    }
+
+    static RollingUpgradeOp getStartInstance(OpInstanceCache cache) {
+      return (RollingUpgradeOp) cache.get(OP_ROLLING_UPGRADE_START);
+    }
+
+    static RollingUpgradeOp getFinalizeInstance(OpInstanceCache cache) {
+      return (RollingUpgradeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
+    }
+
+    long getTime() {
+      return time;
+    }
+
+    void setTime(long time) {
+      this.time = time;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      time = in.readLong();
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeLong(time, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, name + "TIME",
+          Long.valueOf(time).toString());
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.time = Long.valueOf(st.getValue(name + "TIME"));
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("RollingUpgradeOp [").append(name)
+          .append(", time=").append(time).append("]").toString();
+    }
+    
+    static class RollbackException extends IOException {
+      private static final long serialVersionUID = 1L;
+    }
+  }
 
   /**
    * Class for writing editlog ops
@@ -3594,7 +3689,8 @@ public abstract class FSEditLogOp {
      */
     public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
       this.logVersion = logVersion;
-      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
         this.checksum = new PureJavaCrc32();
       } else {
         this.checksum = null;
@@ -3733,7 +3829,8 @@ public abstract class FSEditLogOp {
         throw new IOException("Read invalid opcode " + opCode);
       }
 
-      if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+      if (NameNodeLayoutVersion.supports(
+          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
         // Read the txid
         op.setTransactionId(in.readLong());
       } else {

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Mon Mar 10 23:40:21 2014
@@ -66,8 +66,10 @@ public enum FSEditLogOpCodes {
   OP_ADD_CACHE_POOL                       ((byte) 36),
   OP_MODIFY_CACHE_POOL                    ((byte) 37),
   OP_REMOVE_CACHE_POOL                    ((byte) 38),
-  OP_MODIFY_CACHE_DIRECTIVE    ((byte) 39),
+  OP_MODIFY_CACHE_DIRECTIVE     ((byte) 39),
   OP_SET_ACL                    ((byte) 40),
+  OP_ROLLING_UPGRADE_START      ((byte) 41),
+  OP_ROLLING_UPGRADE_FINALIZE   ((byte) 42),
 
   // Note that the current range of the valid OP code is 0~127
   OP_INVALID                    ((byte) -1);

Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1576130&r1=1576129&r2=1576130&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Mar 10 23:40:21 2014
@@ -21,11 +21,13 @@ import static org.apache.hadoop.util.Tim
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -41,18 +43,15 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.util.Time.now;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -218,13 +217,18 @@ public class FSImage implements Closeabl
       NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
     }
     if (startOpt != StartupOption.UPGRADE
+        && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
         && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
-        && layoutVersion != HdfsConstants.LAYOUT_VERSION) {
+        && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
       throw new IOException(
           "\nFile system image contains an old layout version " 
           + storage.getLayoutVersion() + ".\nAn upgrade to version "
-          + HdfsConstants.LAYOUT_VERSION + " is required.\n"
-          + "Please restart NameNode with -upgrade option.");
+          + HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
+          + "Please restart NameNode with the \""
+          + RollingUpgradeStartupOption.STARTED.getOptionString()
+          + "\" option if a rolling upgraded is already started;"
+          + " or restart NameNode with the \""
+          + StartupOption.UPGRADE + "\" to start a new upgrade.");
     }
     
     storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
@@ -263,7 +267,7 @@ public class FSImage implements Closeabl
       // just load the image
     }
     
-    return loadFSImage(target, recovery);
+    return loadFSImage(target, startOpt, recovery);
   }
   
   /**
@@ -319,8 +323,9 @@ public class FSImage implements Closeabl
     return isFormatted;
   }
 
-  private void doUpgrade(FSNamesystem target) throws IOException {
-    // Upgrade is allowed only if there are 
+  /** Check if upgrade is in progress. */
+  void checkUpgrade(FSNamesystem target) throws IOException {
+    // Upgrade or rolling upgrade is allowed only if there are 
     // no previous fs states in any of the directories
     for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -329,15 +334,37 @@ public class FSImage implements Closeabl
             "previous fs state should not exist during upgrade. "
             + "Finalize or rollback first.");
     }
+  }
+
+  /**
+   * @return true if there is rollback fsimage (for rolling upgrade) in NameNode
+   * directory.
+   */
+  public boolean hasRollbackFSImage() throws IOException {
+    final FSImageStorageInspector inspector = new FSImageTransactionalStorageInspector(
+        EnumSet.of(NameNodeFile.IMAGE_ROLLBACK));
+    storage.inspectStorageDirs(inspector);
+    try {
+      List<FSImageFile> images = inspector.getLatestImages();
+      return images != null && !images.isEmpty();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  void doUpgrade(FSNamesystem target) throws IOException {
+    checkUpgrade(target);
 
     // load the latest image
-    this.loadFSImage(target, null);
+    this.loadFSImage(target, null, null);
 
     // Do upgrade for each directory
+    target.checkRollingUpgrade("upgrade namenode");
+    
     long oldCTime = storage.getCTime();
     storage.cTime = now();  // generate new cTime for the state
     int oldLV = storage.getLayoutVersion();
-    storage.layoutVersion = HdfsConstants.LAYOUT_VERSION;
+    storage.layoutVersion = HdfsConstants.NAMENODE_LAYOUT_VERSION;
     
     List<StorageDirectory> errorSDs =
       Collections.synchronizedList(new ArrayList<StorageDirectory>());
@@ -411,7 +438,7 @@ public class FSImage implements Closeabl
     boolean canRollback = false;
     FSImage prevState = new FSImage(conf);
     try {
-      prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
+      prevState.getStorage().layoutVersion = HdfsConstants.NAMENODE_LAYOUT_VERSION;
       for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
         StorageDirectory sd = it.next();
         File prevDir = sd.getPreviousDir();
@@ -426,12 +453,12 @@ public class FSImage implements Closeabl
         // read and verify consistency of the prev dir
         prevState.getStorage().readPreviousVersionProperties(sd);
 
-        if (prevState.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
+        if (prevState.getLayoutVersion() != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
           throw new IOException(
             "Cannot rollback to storage version " +
                 prevState.getLayoutVersion() +
                 " using this version of the NameNode, which uses storage version " +
-                HdfsConstants.LAYOUT_VERSION + ". " +
+                HdfsConstants.NAMENODE_LAYOUT_VERSION + ". " +
               "Please use the previous version of HDFS to perform the rollback.");
         }
         canRollback = true;
@@ -525,7 +552,7 @@ public class FSImage implements Closeabl
     // return back the real image
     realImage.getStorage().setStorageInfo(ckptImage.getStorage());
     realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
-    realImage.initEditLog();
+    realImage.initEditLog(StartupOption.IMPORT);
 
     target.dir.fsImage = realImage;
     realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
@@ -586,13 +613,22 @@ public class FSImage implements Closeabl
    * @return whether the image should be saved
    * @throws IOException
    */
-  boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
+  private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
+      MetaRecoveryContext recovery)
       throws IOException {
-    FSImageStorageInspector inspector = storage.readAndInspectDirs();
-    FSImageFile imageFile = null;
-    
+    final boolean rollingRollback
+        = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
+    final EnumSet<NameNodeFile> nnfs;
+    if (rollingRollback) {
+      // if it is rollback of rolling upgrade, only load from the rollback image
+      nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
+    } else {
+      // otherwise we can load from both IMAGE and IMAGE_ROLLBACK
+      nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
+    }
+    final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnfs);
+
     isUpgradeFinalized = inspector.isUpgradeFinalized();
- 
     List<FSImageFile> imageFiles = inspector.getLatestImages();
 
     StartupProgress prog = NameNode.getStartupProgress();
@@ -604,14 +640,24 @@ public class FSImage implements Closeabl
 
     Iterable<EditLogInputStream> editStreams = null;
 
-    initEditLog();
+    initEditLog(startOpt);
 
-    if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
-                               getLayoutVersion())) {
+    if (NameNodeLayoutVersion.supports(
+        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
       // If we're open for write, we're either non-HA or we're the active NN, so
       // we better be able to load all the edits. If we're the standby NN, it's
       // OK to not be able to read all of edits right now.
-      long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
+      // In the meanwhile, for HA upgrade, we will still write editlog thus need
+      // this toAtLeastTxId to be set to the max-seen txid
+      // For rollback in rolling upgrade, we need to set the toAtLeastTxId to
+      // the txid right before the upgrade marker.  
+      long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
+          .getMaxSeenTxId() : 0;
+      if (rollingRollback) {
+        // note that the first image in imageFiles is the special checkpoint
+        // for the rolling upgrade
+        toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
+      }
       editStreams = editLog.selectInputStreams(
           imageFiles.get(0).getCheckpointTxId() + 1,
           toAtLeastTxId, recovery, false);
@@ -619,8 +665,7 @@ public class FSImage implements Closeabl
       editStreams = FSImagePreTransactionalStorageInspector
         .getEditLogStreams(storage);
     }
-    int maxOpSize = conf.getInt(DFSConfigKeys.
-          DFS_NAMENODE_MAX_OP_SIZE_KEY,
+    int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
     for (EditLogInputStream elis : editStreams) {
       elis.setMaxOpSize(maxOpSize);
@@ -633,6 +678,7 @@ public class FSImage implements Closeabl
       LOG.info("No edit log streams selected.");
     }
     
+    FSImageFile imageFile = null;
     for (int i = 0; i < imageFiles.size(); i++) {
       try {
         imageFile = imageFiles.get(i);
@@ -650,26 +696,57 @@ public class FSImage implements Closeabl
       throw new IOException("Failed to load an FSImage file!");
     }
     prog.endPhase(Phase.LOADING_FSIMAGE);
-    long txnsAdvanced = loadEdits(editStreams, target, recovery);
-    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
-                                                    txnsAdvanced);
+    
+    if (!rollingRollback) {
+      long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
+      needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
+          txnsAdvanced);
+      if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
+        // rename rollback image if it is downgrade
+        renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
+      }
+    } else {
+      // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
+      // to the last txid in rollback fsimage.
+      rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
+      needToSave = false;
+    }
     editLog.setNextTxId(lastAppliedTxId + 1);
     return needToSave;
   }
 
+  /** rollback for rolling upgrade. */
+  private void rollingRollback(long discardSegmentTxId, long ckptId)
+      throws IOException {
+    // discard discard unnecessary editlog segments starting from the given id
+    this.editLog.discardSegments(discardSegmentTxId);
+    // rename the special checkpoint
+    renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE,
+        true);
+    // purge all the checkpoints after the marker
+    archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);
+    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+    if (HAUtil.isHAEnabled(conf, nameserviceId)) {
+      // close the editlog since it is currently open for write
+      this.editLog.close();
+      // reopen the editlog for read
+      this.editLog.initSharedJournalsForRead();
+    }
+  }
+
   void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
       FSImageFile imageFile) throws IOException {
     LOG.debug("Planning to load image :\n" + imageFile);
     StorageDirectory sdForProperties = imageFile.sd;
     storage.readProperties(sdForProperties);
 
-    if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
-                               getLayoutVersion())) {
+    if (NameNodeLayoutVersion.supports(
+        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
       // For txid-based layout, we should have a .md5 file
       // next to the image file
       loadFSImage(imageFile.getFile(), target, recovery);
-    } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
-                                      getLayoutVersion())) {
+    } else if (NameNodeLayoutVersion.supports(
+        LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {
       // In 0.22, we have the checksum stored in the VERSION file.
       String md5 = storage.getDeprecatedProperty(
           NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
@@ -686,11 +763,15 @@ public class FSImage implements Closeabl
     }
   }
 
-  public void initEditLog() {
+  public void initEditLog(StartupOption startOpt) {
     Preconditions.checkState(getNamespaceID() != 0,
         "Must know namespace ID before initting edit log");
     String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
-    if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
+    if (!HAUtil.isHAEnabled(conf, nameserviceId) || 
+        (HAUtil.isHAEnabled(conf, nameserviceId) && 
+            RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
+      // If this NN is not HA or this NN is HA, but we're doing a rollback of
+      // rolling upgrade so init the edit log for write.
       editLog.initJournalsForWrite();
       editLog.recoverUnclosedStreams();
     } else {
@@ -722,7 +803,13 @@ public class FSImage implements Closeabl
    * Load the specified list of edit files into the image.
    */
   public long loadEdits(Iterable<EditLogInputStream> editStreams,
-      FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
+      FSNamesystem target) throws IOException {
+    return loadEdits(editStreams, target, null, null);
+  }
+
+  private long loadEdits(Iterable<EditLogInputStream> editStreams,
+      FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
+      throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
     StartupProgress prog = NameNode.getStartupProgress();
     prog.beginPhase(Phase.LOADING_EDITS);
@@ -736,7 +823,7 @@ public class FSImage implements Closeabl
         LOG.info("Reading " + editIn + " expecting start txid #" +
               (lastAppliedTxId + 1));
         try {
-          loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
+          loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
         } finally {
           // Update lastAppliedTxId even in case of error, since some ops may
           // have been successfully applied before the error.
@@ -750,7 +837,7 @@ public class FSImage implements Closeabl
     } finally {
       FSEditLog.closeAllStreams(editStreams);
       // update the counts
-      updateCountForQuota(target.dir.rootDir);   
+      updateCountForQuota(target.dir.rootDir);
     }
     prog.endPhase(Phase.LOADING_EDITS);
     return lastAppliedTxId - prevLastAppliedTxId;
@@ -828,9 +915,12 @@ public class FSImage implements Closeabl
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
       FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
+    // BlockPoolId is required when the FsImageLoader loads the rolling upgrade
+    // information. Make sure the ID is properly set.
+    target.setBlockPoolId(this.getBlockPoolID());
+
     FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
     loader.load(curFile);
-    target.setBlockPoolId(this.getBlockPoolID());
 
     // Check that the image digest we loaded matches up with what
     // we expected
@@ -851,11 +941,11 @@ public class FSImage implements Closeabl
   /**
    * Save the contents of the FS image to the file.
    */
-  void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
-      throws IOException {
+  void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
+      NameNodeFile dstType) throws IOException {
     long txid = context.getTxId();
     File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
-    File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
+    File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
     
     FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
@@ -879,16 +969,19 @@ public class FSImage implements Closeabl
   private class FSImageSaver implements Runnable {
     private final SaveNamespaceContext context;
     private StorageDirectory sd;
+    private final NameNodeFile nnf;
 
-    public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) {
+    public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
+        NameNodeFile nnf) {
       this.context = context;
       this.sd = sd;
+      this.nnf = nnf;
     }
 
     @Override
     public void run() {
       try {
-        saveFSImage(context, sd);
+        saveFSImage(context, sd, nnf);
       } catch (SaveNamespaceCancelledException snce) {
         LOG.info("Cancelled image saving for " + sd.getRoot() +
             ": " + snce.getMessage());
@@ -924,17 +1017,18 @@ public class FSImage implements Closeabl
    */
   public synchronized void saveNamespace(FSNamesystem source)
       throws IOException {
-    saveNamespace(source, null);
+    saveNamespace(source, NameNodeFile.IMAGE, null);
   }
   
   /**
    * Save the contents of the FS image to a new image file in each of the
    * current storage directories.
-   * @param canceler 
+   * @param canceler
    */
-  public synchronized void saveNamespace(FSNamesystem source,
+  public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
       Canceler canceler) throws IOException {
     assert editLog != null : "editLog must be initialized";
+    LOG.info("Save namespace ...");
     storage.attemptRestoreRemovedStorage();
 
     boolean editLogWasOpen = editLog.isSegmentOpen();
@@ -944,7 +1038,7 @@ public class FSImage implements Closeabl
     }
     long imageTxId = getLastAppliedOrWrittenTxId();
     try {
-      saveFSImageInAllDirs(source, imageTxId, canceler);
+      saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
       storage.writeAll();
     } finally {
       if (editLogWasOpen) {
@@ -963,12 +1057,11 @@ public class FSImage implements Closeabl
    */
   protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
       throws IOException {
-    saveFSImageInAllDirs(source, txid, null);
+    saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
   }
 
-  protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
-      Canceler canceler)
-      throws IOException {    
+  private synchronized void saveFSImageInAllDirs(FSNamesystem source,
+      NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     prog.beginPhase(Phase.SAVING_CHECKPOINT);
     if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
@@ -986,7 +1079,7 @@ public class FSImage implements Closeabl
       for (Iterator<StorageDirectory> it
              = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
         StorageDirectory sd = it.next();
-        FSImageSaver saver = new FSImageSaver(ctx, sd);
+        FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
         Thread saveThread = new Thread(saver, saver.toString());
         saveThreads.add(saveThread);
         saveThread.start();
@@ -1005,11 +1098,11 @@ public class FSImage implements Closeabl
         assert false : "should have thrown above!";
       }
   
-      renameCheckpoint(txid);
+      renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
   
       // Since we now have a new checkpoint, we can clean up some
       // old edit logs and checkpoints.
-      purgeOldStorage();
+      purgeOldStorage(nnf);
     } finally {
       // Notify any threads waiting on the checkpoint to be canceled
       // that it is complete.
@@ -1023,23 +1116,24 @@ public class FSImage implements Closeabl
    * Purge any files in the storage directories that are no longer
    * necessary.
    */
-  public void purgeOldStorage() {
+  void purgeOldStorage(NameNodeFile nnf) {
     try {
-      archivalManager.purgeOldStorage();
+      archivalManager.purgeOldStorage(nnf);
     } catch (Exception e) {
-      LOG.warn("Unable to purge old storage", e);
+      LOG.warn("Unable to purge old storage " + nnf.getName(), e);
     }
   }
 
   /**
-   * Renames new image
+   * Rename FSImage with the specific txid
    */
-  private void renameCheckpoint(long txid) throws IOException {
+  private void renameCheckpoint(long txid, NameNodeFile fromNnf,
+      NameNodeFile toNnf, boolean renameMD5) throws IOException {
     ArrayList<StorageDirectory> al = null;
 
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
       try {
-        renameCheckpointInDir(sd, txid);
+        renameImageFileInDir(sd, fromNnf, toNnf, txid, renameMD5);
       } catch (IOException ioe) {
         LOG.warn("Unable to rename checkpoint in " + sd, ioe);
         if (al == null) {
@@ -1050,7 +1144,33 @@ public class FSImage implements Closeabl
     }
     if(al != null) storage.reportErrorsOnDirectories(al);
   }
-  
+
+  /**
+   * Rename all the fsimage files with the specific NameNodeFile type. The
+   * associated checksum files will also be renamed.
+   */
+  void renameCheckpoint(NameNodeFile fromNnf, NameNodeFile toNnf)
+      throws IOException {
+    ArrayList<StorageDirectory> al = null;
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector(EnumSet.of(fromNnf));
+    storage.inspectStorageDirs(inspector);
+    for (FSImageFile image : inspector.getFoundImages()) {
+      try {
+        renameImageFileInDir(image.sd, fromNnf, toNnf, image.txId, true);
+      } catch (IOException ioe) {
+        LOG.warn("Unable to rename checkpoint in " + image.sd, ioe);
+        if (al == null) {
+          al = Lists.newArrayList();
+        }
+        al.add(image.sd);
+      }
+    }
+    if(al != null) {
+      storage.reportErrorsOnDirectories(al);
+    }
+  }
+
   /**
    * Deletes the checkpoint file in every storage directory,
    * since the checkpoint was cancelled.
@@ -1068,23 +1188,24 @@ public class FSImage implements Closeabl
     storage.reportErrorsOnDirectories(al);
   }
 
-
-  private void renameCheckpointInDir(StorageDirectory sd, long txid)
-      throws IOException {
-    File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
-    File curFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
-    // renameTo fails on Windows if the destination file 
-    // already exists.
+  private void renameImageFileInDir(StorageDirectory sd, NameNodeFile fromNnf,
+      NameNodeFile toNnf, long txid, boolean renameMD5) throws IOException {
+    final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
+    final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
+    // renameTo fails on Windows if the destination file already exists.
     if(LOG.isDebugEnabled()) {
-      LOG.debug("renaming  " + ckpt.getAbsolutePath() 
-                + " to " + curFile.getAbsolutePath());
+      LOG.debug("renaming  " + fromFile.getAbsolutePath() 
+                + " to " + toFile.getAbsolutePath());
     }
-    if (!ckpt.renameTo(curFile)) {
-      if (!curFile.delete() || !ckpt.renameTo(curFile)) {
-        throw new IOException("renaming  " + ckpt.getAbsolutePath() + " to "  + 
-            curFile.getAbsolutePath() + " FAILED");
+    if (!fromFile.renameTo(toFile)) {
+      if (!toFile.delete() || !fromFile.renameTo(toFile)) {
+        throw new IOException("renaming  " + fromFile.getAbsolutePath() + " to "  + 
+            toFile.getAbsolutePath() + " FAILED");
       }
-    }    
+    }
+    if (renameMD5) {
+      MD5FileUtils.renameMD5File(fromFile, toFile);
+    }
   }
 
   CheckpointSignature rollEditLog() throws IOException {
@@ -1165,13 +1286,13 @@ public class FSImage implements Closeabl
    * renames the image from fsimage_N.ckpt to fsimage_N and also
    * saves the related .md5 file into place.
    */
-  public synchronized void saveDigestAndRenameCheckpointImage(
+  public synchronized void saveDigestAndRenameCheckpointImage(NameNodeFile nnf,
       long txid, MD5Hash digest) throws IOException {
     // Write and rename MD5 file
     List<StorageDirectory> badSds = Lists.newArrayList();
     
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
-      File imageFile = NNStorage.getImageFile(sd, txid);
+      File imageFile = NNStorage.getImageFile(sd, nnf, txid);
       try {
         MD5FileUtils.saveMD5File(imageFile, digest);
       } catch (IOException ioe) {
@@ -1183,7 +1304,7 @@ public class FSImage implements Closeabl
     CheckpointFaultInjector.getInstance().afterMD5Rename();
     
     // Rename image from tmp file
-    renameCheckpoint(txid);
+    renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
     // So long as this is the newest image available,
     // advertise it as such to other checkpointers
     // from now on



Mime
View raw message