hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1228651 [1/2] - in /hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/s...
Date Sat, 07 Jan 2012 16:28:28 GMT
Author: szetszwo
Date: Sat Jan  7 16:28:27 2012
New Revision: 1228651

URL: http://svn.apache.org/viewvc?rev=1228651&view=rev
Log:
svn merge -c 1165826 from trunk for HDFS-2018.

Modified:
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java
    hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jan  7 16:28:27 2012
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932,1189982,1190077,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205146,1205260,1205626,1206178,1206786,1206830,1207585,1207694,1208140,1208153,1208313,1212021,1212062,1212073,1212084,1212299,1213537,1213586,1213592-1213593,1213954,1214027,1214046,1220510,1221106,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1165826,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932,1189982,1190077,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205146,1205260,1205626,1206178,1206786,1206830,1207585,1207694,1208140,1208153,1208313,1212021,1212062,1212073,1212084,1212299,1213537,1213586,1213592-1213593,1213954,1214027,1214046,1220510,1221106,1221348,1226211,1227091,1227423
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Jan  7 16:28:27 2012
@@ -82,6 +82,9 @@ Release 0.23.1 - UNRELEASED
     for a client on the same node as the block file.  (Andrew Purtell,
     Suresh Srinivas and Jitendra Nath Pandey via szetszwo)
 
+    HDFS-2018. Move all journal stream management code into one place.
+               (Ivan Kelly via jitendra)
+
   BUG FIXES
 
     HDFS-2541. For a sufficiently large value of blocks, the DN Scanner 

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jan  7 16:28:27 2012
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932,1189982,1190077,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205146,1205260,1206786,1206830,1207694,1208140,1208153,1208313,1212021,1212062,1212073,1212084,1212299,1213537,1213586,1213592-1213593,1213954,1214027,1214046,1220510,1221106,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1165826,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177757,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932,1189982,1190077,1190708,1195575,1195656,1195731,1195754,1196113,1196129,1197329,1198903,1199396,1200731,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205146,1205260,1206786,1206830,1207694,1208140,1208153,1208313,1212021,1212062,1212073,1212084,1212299,1213537,1213586,1213592-1213593,1213954,1214027,1214046,1220510,1221106,1221348,1226211,1227091,1227423
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Sat Jan  7 16:28:27 2012
@@ -18,18 +18,21 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.zip.Checksum;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * Extension of FSImage for the backup node.
@@ -257,11 +260,18 @@ public class BackupImage extends FSImage
         new FSImageTransactionalStorageInspector();
       
       storage.inspectStorageDirs(inspector);
-      LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
-          target - 1);
-  
-      logLoadPlan.doRecovery();
-      loadEdits(logLoadPlan.getEditsFiles());
+
+      editLog.recoverUnclosedStreams();
+      Iterable<EditLogInputStream> editStreamsAll 
+        = editLog.selectInputStreams(lastAppliedTxId, target - 1);
+      // remove inprogress
+      List<EditLogInputStream> editStreams = Lists.newArrayList();
+      for (EditLogInputStream s : editStreamsAll) {
+        if (s.getFirstTxId() != editLog.getCurSegmentTxId()) {
+          editStreams.add(s);
+        }
+      }
+      loadEdits(editStreams);
     }
     
     // now, need to load the in-progress file
@@ -271,7 +281,24 @@ public class BackupImage extends FSImage
         return false; // drop lock and try again to load local logs
       }
       
-      EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
+      EditLogInputStream stream = null;
+      Collection<EditLogInputStream> editStreams
+        = getEditLog().selectInputStreams(
+            getEditLog().getCurSegmentTxId(),
+            getEditLog().getCurSegmentTxId());
+      
+      for (EditLogInputStream s : editStreams) {
+        if (s.getFirstTxId() == getEditLog().getCurSegmentTxId()) {
+          stream = s;
+        }
+        break;
+      }
+      if (stream == null) {
+        LOG.warn("Unable to find stream starting with " + editLog.getCurSegmentTxId()
+                 + ". This indicates that there is an error in synchronization in BackupImage");
+        return false;
+      }
+
       try {
         long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
         
@@ -285,7 +312,7 @@ public class BackupImage extends FSImage
           "expected to load " + remainingTxns + " but loaded " +
           numLoaded + " from " + stream;
       } finally {
-        IOUtils.closeStream(stream);
+        FSEditLog.closeAllStreams(editStreams);
       }
 
       LOG.info("Successfully synced BackupNode with NameNode at txnid " +

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Sat Jan  7 16:28:27 2012
@@ -57,12 +57,31 @@ class BackupJournalManager implements Jo
       throws IOException {
   }
 
+  @Override
+  public long getNumberOfTransactions(long fromTxnId) 
+      throws IOException, CorruptionException {
+    // This JournalManager is never used for input. Therefore it cannot
+    // return any transactions
+    return 0;
+  }
+  
+  @Override
+  public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+    // This JournalManager is never used for input. Therefore it cannot
+    // return any transactions
+    throw new IOException("Unsupported operation");
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+  }
+
   public boolean matchesRegistration(NamenodeRegistration bnReg) {
     return bnReg.getAddress().equals(this.bnReg.getAddress());
   }
 
   @Override
-  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
-    return null;
+  public String toString() {
+    return "BackupJournalManager";
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Jan  7 16:28:27 2012
@@ -275,16 +275,17 @@ class Checkpointer extends Daemon {
       FSImage dstImage) throws IOException {
     NNStorage dstStorage = dstImage.getStorage();
   
-    List<File> editsFiles = Lists.newArrayList();
+    List<EditLogInputStream> editsStreams = Lists.newArrayList();    
     for (RemoteEditLog log : manifest.getLogs()) {
       File f = dstStorage.findFinalizedEditsFile(
           log.getStartTxId(), log.getEndTxId());
       if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
-        editsFiles.add(f);
-      }
+        editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), 
+                                                    log.getEndTxId()));
+       }
     }
     LOG.info("Checkpointer about to load edits from " +
-        editsFiles.size() + " file(s).");
-    dstImage.loadEdits(editsFiles);
+        editsStreams.size() + " stream(s).");
+    dstImage.loadEdits(editsStreams);
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Sat Jan  7 16:28:27 2012
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import com.google.common.base.Preconditions;
 
 /**
@@ -122,4 +123,14 @@ class EditLogBackupInputStream extends E
     reader = null;
     this.version = 0;
   }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return HdfsConstants.INVALID_TXID;
+  }
+
+  @Override
+  public long getLastTxId() throws IOException {
+    return HdfsConstants.INVALID_TXID;
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Sat Jan  7 16:28:27 2012
@@ -27,6 +27,7 @@ import java.io.DataInputStream;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -37,12 +38,15 @@ import com.google.common.annotations.Vis
 class EditLogFileInputStream extends EditLogInputStream {
   private final File file;
   private final FileInputStream fStream;
+  final private long firstTxId;
+  final private long lastTxId;
   private final int logVersion;
   private final FSEditLogOp.Reader reader;
   private final FSEditLogLoader.PositionTrackingInputStream tracker;
   
   /**
    * Open an EditLogInputStream for the given file.
+   * The file is pretransactional, so has no txids
    * @param name filename to open
    * @throws LogHeaderCorruptException if the header is either missing or
    *         appears to be corrupt/truncated
@@ -51,6 +55,21 @@ class EditLogFileInputStream extends Edi
    */
   EditLogFileInputStream(File name)
       throws LogHeaderCorruptException, IOException {
+    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
+  }
+
+  /**
+   * Open an EditLogInputStream for the given file.
+   * @param name filename to open
+   * @param firstTxId first transaction found in file
+   * @param lastTxId last transaction id found in file
+   * @throws LogHeaderCorruptException if the header is either missing or
+   *         appears to be corrupt/truncated
+   * @throws IOException if an actual IO error occurs while reading the
+   *         header
+   */
+  EditLogFileInputStream(File name, long firstTxId, long lastTxId)
+      throws LogHeaderCorruptException, IOException {
     file = name;
     fStream = new FileInputStream(name);
 
@@ -65,6 +84,18 @@ class EditLogFileInputStream extends Edi
     }
 
     reader = new FSEditLogOp.Reader(in, logVersion);
+    this.firstTxId = firstTxId;
+    this.lastTxId = lastTxId;
+  }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return firstTxId;
+  }
+  
+  @Override
+  public long getLastTxId() throws IOException {
+    return lastTxId;
   }
 
   @Override // JournalStream
@@ -116,7 +147,8 @@ class EditLogFileInputStream extends Edi
       // If it's missing its header, this is equivalent to no transactions
       FSImage.LOG.warn("Log at " + file + " has no valid header",
           corrupt);
-      return new FSEditLogLoader.EditLogValidation(0, 0);
+      return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, 
+                                                   HdfsConstants.INVALID_TXID);
     }
     
     try {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Sat Jan  7 16:28:27 2012
@@ -28,6 +28,17 @@ import java.io.IOException;
  * into the #{@link EditLogOutputStream}.
  */
 abstract class EditLogInputStream implements JournalStream, Closeable {
+  /** 
+   * @return the first transaction which will be found in this stream
+   */
+  public abstract long getFirstTxId() throws IOException;
+  
+  /** 
+   * @return the last transaction which will be found in this stream
+   */
+  public abstract long getLastTxId() throws IOException;
+
+
   /**
    * Close the stream.
    * @throws IOException if an error occurred while closing

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Jan  7 16:28:27 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -35,11 +36,13 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -1069,6 +1072,112 @@ public class FSEditLog  {
   }
 
   /**
+   * Find the best editlog input stream to read from txid. In this case
+   * best means the editlog which has the largest continuous range of 
+   * transactions starting from the transaction id, fromTxId.
+   *
+   * If a journal throws an CorruptionException while reading from a txn id,
+   * it means that it has more transactions, but can't find any from fromTxId. 
+   * If this is the case and no other journal has transactions, we should throw
+   * an exception as it means more transactions exist, we just can't load them.
+   *
+   * @param fromTxId Transaction id to start from.
+   * @return a edit log input stream with tranactions fromTxId 
+   *         or null if no more exist
+   */
+  private EditLogInputStream selectStream(long fromTxId) 
+      throws IOException {
+    JournalManager bestjm = null;
+    long bestjmNumTxns = 0;
+    CorruptionException corruption = null;
+
+    for (JournalAndStream jas : journals) {
+      JournalManager candidate = jas.getManager();
+      long candidateNumTxns = 0;
+      try {
+        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
+      } catch (CorruptionException ce) {
+        corruption = ce;
+      } catch (IOException ioe) {
+        LOG.warn("Error reading number of transactions from " + candidate);
+        continue; // error reading disk, just skip
+      }
+      
+      if (candidateNumTxns > bestjmNumTxns) {
+        bestjm = candidate;
+        bestjmNumTxns = candidateNumTxns;
+      }
+    }
+    
+    
+    if (bestjm == null) {
+      /**
+       * If all candidates either threw a CorruptionException or
+       * found 0 transactions, then a gap exists. 
+       */
+      if (corruption != null) {
+        throw new IOException("Gap exists in logs from " 
+                              + fromTxId, corruption);
+      } else {
+        return null;
+      }
+    }
+
+    return bestjm.getInputStream(fromTxId);
+  }
+
+  /**
+   * Run recovery on all journals to recover any unclosed segments
+   */
+  void recoverUnclosedStreams() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          jas.manager.recoverUnfinalizedSegments();
+        }
+      }, "recovering unclosed streams");
+  }
+
+  /**
+   * Select a list of input streams to load.
+   * @param fromTxId first transaction in the selected streams
+   * @param toAtLeast the selected streams must contain this transaction
+   */
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) 
+      throws IOException {
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    
+    boolean gapFound = false;
+    EditLogInputStream stream = selectStream(fromTxId);
+    while (stream != null) {
+      fromTxId = stream.getLastTxId() + 1;
+      streams.add(stream);
+      try {
+        stream = selectStream(fromTxId);
+      } catch (IOException ioe) {
+        gapFound = true;
+        break;
+      }
+    }
+    if (fromTxId <= toAtLeastTxId || gapFound) {
+      closeAllStreams(streams);
+      throw new IOException("No non-corrupt logs for txid " 
+                            + fromTxId);
+    }
+    return streams;
+  }
+
+  /** 
+   * Close all the streams in a collection
+   * @param streams The list of streams to close
+   */
+  static void closeAllStreams(Iterable<EditLogInputStream> streams) {
+    for (EditLogInputStream s : streams) {
+      IOUtils.closeStream(s);
+    }
+  }
+
+  /**
    * Container for a JournalManager paired with its currently
    * active stream.
    * 
@@ -1137,30 +1246,5 @@ public class FSEditLog  {
     JournalManager getManager() {
       return manager;
     }
-
-    private EditLogInputStream getInProgressInputStream() throws IOException {
-      return manager.getInProgressInputStream(segmentStartsAtTxId);
-    }
-  }
-
-  /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. This is used from the BackupNode
-   * during edits synchronization.
-   * @throws IOException if no valid logs are available.
-   */
-  synchronized EditLogInputStream getInProgressFileInputStream()
-      throws IOException {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      try {
-        EditLogInputStream in = jas.getInProgressInputStream();
-        if (in != null) return in;
-      } catch (IOException ioe) {
-        LOG.warn("Unable to get the in-progress input stream from " + jas,
-            ioe);
-      }
-    }
-    throw new IOException("No in-progress stream provided edits");
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Sat Jan  7 16:28:27 2012
@@ -446,24 +446,6 @@ public class FSEditLogLoader {
     }
   }
   
-  static EditLogValidation validateEditLog(File file) throws IOException {
-    EditLogFileInputStream in;
-    try {
-      in = new EditLogFileInputStream(file);
-    } catch (LogHeaderCorruptException corrupt) {
-      // If it's missing its header, this is equivalent to no transactions
-      FSImage.LOG.warn("Log at " + file + " has no valid header",
-          corrupt);
-      return new EditLogValidation(0, 0);
-    }
-    
-    try {
-      return validateEditLog(in);
-    } finally {
-      IOUtils.closeStream(in);
-    }
-  }
-
   /**
    * Return the number of valid transactions in the stream. If the stream is
    * truncated during the header, returns a value indicating that there are
@@ -473,12 +455,26 @@ public class FSEditLogLoader {
    *                     if the log does not exist)
    */
   static EditLogValidation validateEditLog(EditLogInputStream in) {
-    long numValid = 0;
     long lastPos = 0;
+    long firstTxId = HdfsConstants.INVALID_TXID;
+    long lastTxId = HdfsConstants.INVALID_TXID;
+    long numValid = 0;
     try {
+      FSEditLogOp op = null;
       while (true) {
         lastPos = in.getPosition();
-        if (in.readOp() == null) {
+        if ((op = in.readOp()) == null) {
+          break;
+        }
+        if (firstTxId == HdfsConstants.INVALID_TXID) {
+          firstTxId = op.txid;
+        }
+        if (lastTxId == HdfsConstants.INVALID_TXID
+            || op.txid == lastTxId + 1) {
+          lastTxId = op.txid;
+        } else {
+          FSImage.LOG.error("Out of order txid found. Found " + op.txid 
+                            + ", expected " + (lastTxId + 1));
           break;
         }
         numValid++;
@@ -489,16 +485,33 @@ public class FSEditLogLoader {
       FSImage.LOG.debug("Caught exception after reading " + numValid +
           " ops from " + in + " while determining its valid length.", t);
     }
-    return new EditLogValidation(lastPos, numValid);
+    return new EditLogValidation(lastPos, firstTxId, lastTxId);
   }
   
   static class EditLogValidation {
-    long validLength;
-    long numTransactions;
-    
-    EditLogValidation(long validLength, long numTransactions) {
+    private long validLength;
+    private long startTxId;
+    private long endTxId;
+     
+    EditLogValidation(long validLength, 
+                      long startTxId, long endTxId) {
       this.validLength = validLength;
-      this.numTransactions = numTransactions;
+      this.startTxId = startTxId;
+      this.endTxId = endTxId;
+    }
+    
+    long getValidLength() { return validLength; }
+    
+    long getStartTxId() { return startTxId; }
+    
+    long getEndTxId() { return endTxId; }
+    
+    long getNumTransactions() { 
+      if (endTxId == HdfsConstants.INVALID_TXID
+          || startTxId == HdfsConstants.INVALID_TXID) {
+        return 0;
+      }
+      return (endTxId - startTxId) + 1;
     }
   }
 

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Jan  7 16:28:27 2012
@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.com
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
+
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -584,32 +584,38 @@ public class FSImage implements Closeabl
     FSImageStorageInspector inspector = storage.readAndInspectDirs();
     
     isUpgradeFinalized = inspector.isUpgradeFinalized();
-    
+ 
+    FSImageStorageInspector.FSImageFile imageFile 
+      = inspector.getLatestImage();   
     boolean needToSave = inspector.needToSave();
-    
-    // Plan our load. This will throw if it's impossible to load from the
-    // data that's available.
-    LoadPlan loadPlan = inspector.createLoadPlan();    
-    LOG.debug("Planning to load image using following plan:\n" + loadPlan);
-
-    
-    // Recover from previous interrupted checkpoint, if any
-    needToSave |= loadPlan.doRecovery();
-
-    //
-    // Load in bits
-    //
-    StorageDirectory sdForProperties =
-      loadPlan.getStorageDirectoryForProperties();
-    storage.readProperties(sdForProperties);
-    File imageFile = loadPlan.getImageFile();
 
+    Iterable<EditLogInputStream> editStreams = null;
+
+    editLog.recoverUnclosedStreams();
+
+    if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
+                               getLayoutVersion())) {
+      editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
+                                               inspector.getMaxSeenTxId());
+    } else {
+      editStreams = FSImagePreTransactionalStorageInspector
+        .getEditLogStreams(storage);
+    }
+ 
+    LOG.debug("Planning to load image :\n" + imageFile);
+    for (EditLogInputStream l : editStreams) {
+      LOG.debug("\t Planning to load edit stream: " + l);
+    }
+    
     try {
+      StorageDirectory sdForProperties = imageFile.sd;
+      storage.readProperties(sdForProperties);
+
       if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
                                  getLayoutVersion())) {
         // For txid-based layout, we should have a .md5 file
         // next to the image file
-        loadFSImage(imageFile);
+        loadFSImage(imageFile.getFile());
       } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
                                         getLayoutVersion())) {
         // In 0.22, we have the checksum stored in the VERSION file.
@@ -621,17 +627,19 @@ public class FSImage implements Closeabl
               NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
               " not set for storage directory " + sdForProperties.getRoot());
         }
-        loadFSImage(imageFile, new MD5Hash(md5));
+        loadFSImage(imageFile.getFile(), new MD5Hash(md5));
       } else {
         // We don't have any record of the md5sum
-        loadFSImage(imageFile, null);
+        loadFSImage(imageFile.getFile(), null);
       }
     } catch (IOException ioe) {
-      throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
+      FSEditLog.closeAllStreams(editStreams);
+      throw new IOException("Failed to load image from " + imageFile, ioe);
     }
     
-    long numLoaded = loadEdits(loadPlan.getEditsFiles());
-    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded);
+    long numLoaded = loadEdits(editStreams);
+    needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
+                                                    numLoaded);
     
     // update the txid for the edit log
     editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
@@ -663,22 +671,25 @@ public class FSImage implements Closeabl
    * Load the specified list of edit files into the image.
    * @return the number of transactions loaded
    */
-  protected long loadEdits(List<File> editLogs) throws IOException {
-    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editLogs));
+  protected long loadEdits(Iterable<EditLogInputStream> editStreams) throws IOException {
+    LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
 
     long startingTxId = getLastAppliedTxId() + 1;
-    
-    FSEditLogLoader loader = new FSEditLogLoader(namesystem);
     int numLoaded = 0;
-    // Load latest edits
-    for (File edits : editLogs) {
-      LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId);
-      EditLogFileInputStream editIn = new EditLogFileInputStream(edits);
-      int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
-      startingTxId += thisNumLoaded;
-      numLoaded += thisNumLoaded;
-      lastAppliedTxId += thisNumLoaded;
-      editIn.close();
+
+    try {    
+      FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+      
+      // Load latest edits
+      for (EditLogInputStream editIn : editStreams) {
+        LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
+        int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
+        startingTxId += thisNumLoaded;
+        numLoaded += thisNumLoaded;
+        lastAppliedTxId += thisNumLoaded;
+      }
+    } finally {
+      FSEditLog.closeAllStreams(editStreams);
     }
 
     // update the counts

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java Sat Jan  7 16:28:27 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@@ -55,6 +56,7 @@ class FSImagePreTransactionalStorageInsp
   private boolean hasOutOfDateStorageDirs = false;
   /* Flag set false if there are any "previous" directories found */
   private boolean isUpgradeFinalized = true;
+  private boolean needToSaveAfterRecovery = false;
   
   // Track the name and edits dir with the latest times
   private long latestNameCheckpointTime = Long.MIN_VALUE;
@@ -139,15 +141,15 @@ class FSImagePreTransactionalStorageInsp
   boolean isUpgradeFinalized() {
     return isUpgradeFinalized;
   }
-  
+    
   @Override
-  LoadPlan createLoadPlan() throws IOException {
+  FSImageFile getLatestImage() throws IOException {
     // We should have at least one image and one edits dirs
     if (latestNameSD == null)
       throw new IOException("Image file is not found in " + imageDirs);
     if (latestEditsSD == null)
       throw new IOException("Edits file is not found in " + editsDirs);
-
+    
     // Make sure we are loading image and edits from same checkpoint
     if (latestNameCheckpointTime > latestEditsCheckpointTime
         && latestNameSD != latestEditsSD
@@ -168,92 +170,70 @@ class FSImagePreTransactionalStorageInsp
                       "image checkpoint time = " + latestNameCheckpointTime +
                       "edits checkpoint time = " + latestEditsCheckpointTime);
     }
+
+    needToSaveAfterRecovery = doRecovery();
     
-    return new PreTransactionalLoadPlan();
+    return new FSImageFile(latestNameSD, 
+        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE),
+        HdfsConstants.INVALID_TXID);
   }
-  
+
   @Override
   boolean needToSave() {
     return hasOutOfDateStorageDirs ||
       checkpointTimes.size() != 1 ||
-      latestNameCheckpointTime > latestEditsCheckpointTime;
-
+      latestNameCheckpointTime > latestEditsCheckpointTime ||
+      needToSaveAfterRecovery;
   }
   
-  private class PreTransactionalLoadPlan extends LoadPlan {
-
-    @Override
-    boolean doRecovery() throws IOException {
-      LOG.debug(
+  boolean doRecovery() throws IOException {
+    LOG.debug(
         "Performing recovery in "+ latestNameSD + " and " + latestEditsSD);
       
-      boolean needToSave = false;
-      File curFile =
-        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
-      File ckptFile =
-        NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
-
-      //
-      // If we were in the midst of a checkpoint
-      //
-      if (ckptFile.exists()) {
-        needToSave = true;
-        if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
-              .exists()) {
-          //
-          // checkpointing migth have uploaded a new
-          // merged image, but we discard it here because we are
-          // not sure whether the entire merged image was uploaded
-          // before the namenode crashed.
-          //
-          if (!ckptFile.delete()) {
-            throw new IOException("Unable to delete " + ckptFile);
-          }
-        } else {
-          //
-          // checkpointing was in progress when the namenode
-          // shutdown. The fsimage.ckpt was created and the edits.new
-          // file was moved to edits. We complete that checkpoint by
-          // moving fsimage.new to fsimage. There is no need to 
-          // update the fstime file here. renameTo fails on Windows
-          // if the destination file already exists.
-          //
+    boolean needToSave = false;
+    File curFile =
+      NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
+    File ckptFile =
+      NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
+    
+    //
+    // If we were in the midst of a checkpoint
+    //
+    if (ckptFile.exists()) {
+      needToSave = true;
+      if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
+          .exists()) {
+        //
+        // checkpointing migth have uploaded a new
+        // merged image, but we discard it here because we are
+        // not sure whether the entire merged image was uploaded
+        // before the namenode crashed.
+        //
+        if (!ckptFile.delete()) {
+          throw new IOException("Unable to delete " + ckptFile);
+        }
+      } else {
+        //
+        // checkpointing was in progress when the namenode
+        // shutdown. The fsimage.ckpt was created and the edits.new
+        // file was moved to edits. We complete that checkpoint by
+        // moving fsimage.new to fsimage. There is no need to 
+        // update the fstime file here. renameTo fails on Windows
+        // if the destination file already exists.
+        //
+        if (!ckptFile.renameTo(curFile)) {
+          if (!curFile.delete())
+            LOG.warn("Unable to delete dir " + curFile + " before rename");
           if (!ckptFile.renameTo(curFile)) {
-            if (!curFile.delete())
-              LOG.warn("Unable to delete dir " + curFile + " before rename");
-            if (!ckptFile.renameTo(curFile)) {
-              throw new IOException("Unable to rename " + ckptFile +
-                                    " to " + curFile);
-            }
+            throw new IOException("Unable to rename " + ckptFile +
+                                  " to " + curFile);
           }
         }
       }
-      return needToSave;
-    }
-
-    @Override
-    File getImageFile() {
-      return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
     }
-
-    @Override
-    List<File> getEditsFiles() {
-      if (latestNameCheckpointTime > latestEditsCheckpointTime) {
-        // the image is already current, discard edits
-        LOG.debug(
-          "Name checkpoint time is newer than edits, not loading edits.");
-        return Collections.<File>emptyList();
-      }
-      
-      return getEditsInStorageDir(latestEditsSD);
-    }
-
-    @Override
-    StorageDirectory getStorageDirectoryForProperties() {
-      return latestNameSD;
-    }    
+    return needToSave;
   }
-
+  
   /**
    * @return a list with the paths to EDITS and EDITS_NEW (if it exists)
    * in a given storage directory.
@@ -269,4 +249,33 @@ class FSImagePreTransactionalStorageInsp
     }
     return files;
   }
+  
+  private List<File> getLatestEditsFiles() {
+    if (latestNameCheckpointTime > latestEditsCheckpointTime) {
+      // the image is already current, discard edits
+      LOG.debug(
+          "Name checkpoint time is newer than edits, not loading edits.");
+      return Collections.<File>emptyList();
+    }
+    
+    return getEditsInStorageDir(latestEditsSD);
+  }
+  
+  @Override
+  long getMaxSeenTxId() {
+    return 0L;
+  }
+
+  static Iterable<EditLogInputStream> getEditLogStreams(NNStorage storage)
+      throws IOException {
+    FSImagePreTransactionalStorageInspector inspector 
+      = new FSImagePreTransactionalStorageInspector();
+    storage.inspectStorageDirs(inspector);
+
+    List<EditLogInputStream> editStreams = new ArrayList<EditLogInputStream>();
+    for (File f : inspector.getLatestEditsFiles()) {
+      editStreams.add(new EditLogFileInputStream(f));
+    }
+    return editStreams;
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java Sat Jan  7 16:28:27 2012
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -43,11 +44,16 @@ abstract class FSImageStorageInspector {
   abstract boolean isUpgradeFinalized();
   
   /**
-   * Create a plan to load the image from the set of inspected storage directories.
+   * Get the image files which should be loaded into the filesystem.
    * @throws IOException if not enough files are available (eg no image found in any directory)
    */
-  abstract LoadPlan createLoadPlan() throws IOException;
-  
+  abstract FSImageFile getLatestImage() throws IOException;
+
+  /** 
+   * Get the minimum tx id which should be loaded with this set of images.
+   */
+  abstract long getMaxSeenTxId();
+
   /**
    * @return true if the directories are in such a state that the image should be re-saved
    * following the load
@@ -55,49 +61,6 @@ abstract class FSImageStorageInspector {
   abstract boolean needToSave();
 
   /**
-   * A plan to load the namespace from disk, providing the locations from which to load
-   * the image and a set of edits files.
-   */
-  abstract static class LoadPlan {
-    /**
-     * Execute atomic move sequence in the chosen storage directories,
-     * in order to recover from an interrupted checkpoint.
-     * @return true if some recovery action was taken
-     */
-    abstract boolean doRecovery() throws IOException;
-
-    /**
-     * @return the file from which to load the image data
-     */
-    abstract File getImageFile();
-    
-    /**
-     * @return a list of flies containing edits to replay
-     */
-    abstract List<File> getEditsFiles();
-    
-    /**
-     * @return the storage directory containing the VERSION file that should be
-     * loaded.
-     */
-    abstract StorageDirectory getStorageDirectoryForProperties();
-    
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Will load image file: ").append(getImageFile()).append("\n");
-      sb.append("Will load edits files:").append("\n");
-      for (File f : getEditsFiles()) {
-        sb.append("  ").append(f).append("\n");
-      }
-      sb.append("Will load metadata from: ")
-        .append(getStorageDirectoryForProperties())
-        .append("\n");
-      return sb.toString();
-    }
-  }
-
-  /**
    * Record of an image that has been located and had its filename parsed.
    */
   static class FSImageFile {
@@ -106,7 +69,8 @@ abstract class FSImageStorageInspector {
     private final File file;
     
     FSImageFile(StorageDirectory sd, File file, long txId) {
-      assert txId >= 0 : "Invalid txid on " + file +": " + txId;
+      assert txId >= 0 || txId == HdfsConstants.INVALID_TXID 
+        : "Invalid txid on " + file +": " + txId;
       
       this.sd = sd;
       this.txId = txId;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Sat Jan  7 16:28:27 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -52,9 +51,7 @@ class FSImageTransactionalStorageInspect
   private boolean isUpgradeFinalized = true;
   
   List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
-  List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
-  SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
-  long maxSeenTxId = 0;
+  private long maxSeenTxId = 0;
   
   private static final Pattern IMAGE_REGEX = Pattern.compile(
     NameNodeFile.IMAGE.getName() + "_(\\d+)");
@@ -68,6 +65,8 @@ class FSImageTransactionalStorageInspect
       return;
     }
     
+    maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+
     File currentDir = sd.getCurrentDir();
     File filesInStorage[];
     try {
@@ -110,34 +109,10 @@ class FSImageTransactionalStorageInspect
       LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
     }
     
-    List<EditLogFile> editLogs 
-      = FileJournalManager.matchEditLogs(filesInStorage);
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      for (EditLogFile log : editLogs) {
-        addEditLog(log);
-      }
-    } else if (!editLogs.isEmpty()){
-      LOG.warn("Found the following edit log file(s) in " + sd +
-          " even though it was not configured to store edits:\n" +
-          "  " + Joiner.on("\n  ").join(editLogs));
-          
-    }
-    
     // set finalized flag
     isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
   }
 
-  private void addEditLog(EditLogFile foundEditLog) {
-    foundEditLogs.add(foundEditLog);
-    LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
-    if (group == null) {
-      group = new LogGroup(foundEditLog.getFirstTxId());
-      logGroups.put(foundEditLog.getFirstTxId(), group);
-    }
-    group.add(foundEditLog);
-  }
-
-
   @Override
   public boolean isUpgradeFinalized() {
     return isUpgradeFinalized;
@@ -148,9 +123,13 @@ class FSImageTransactionalStorageInspect
    * If there are multiple storage directories which contain equal images 
    * the storage directory that was inspected first will be preferred.
    * 
-   * Returns null if no images were found.
+   * @throws FileNotFoundException if not images are found.
    */
-  FSImageFile getLatestImage() {
+  FSImageFile getLatestImage() throws IOException {
+    if (foundImages.isEmpty()) {
+      throw new FileNotFoundException("No valid image files found");
+    }
+
     FSImageFile ret = null;
     for (FSImageFile img : foundImages) {
       if (ret == null || img.txId > ret.txId) {
@@ -164,349 +143,13 @@ class FSImageTransactionalStorageInspect
     return ImmutableList.copyOf(foundImages);
   }
   
-  public List<EditLogFile> getEditLogFiles() {
-    return ImmutableList.copyOf(foundEditLogs);
-  }
-
-  @Override
-  public LoadPlan createLoadPlan() throws IOException {
-    if (foundImages.isEmpty()) {
-      throw new FileNotFoundException("No valid image files found");
-    }
-
-    FSImageFile recoveryImage = getLatestImage();
-    LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
-
-    return new TransactionalLoadPlan(recoveryImage,
-        logPlan);
-  }
-  
-  /**
-   * Plan which logs to load in order to bring the namespace up-to-date.
-   * Transactions will be considered in the range (sinceTxId, maxTxId]
-   * 
-   * @param sinceTxId the highest txid that is already loaded 
-   *                  (eg from the image checkpoint)
-   * @param maxStartTxId ignore any log files that start after this txid
-   */
-  LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
-    long expectedTxId = sinceTxId + 1;
-    
-    List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
-    
-    SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
-    if (logGroups.size() > tailGroups.size()) {
-      LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + 
-          " groups of logs because they start with a txid less than image " +
-          "txid " + sinceTxId);
-    }
-    
-    SortedMap<Long, LogGroup> usefulGroups;
-    if (maxStartTxId > sinceTxId) {
-      usefulGroups = tailGroups.headMap(maxStartTxId);
-    } else {
-      usefulGroups = new TreeMap<Long, LogGroup>();
-    }
-    
-    if (usefulGroups.size() > tailGroups.size()) {
-      LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + 
-        " groups of logs because they start with a txid higher than max " +
-        "txid " + sinceTxId);
-    }
-
-
-    for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
-      long logStartTxId = entry.getKey();
-      LogGroup logGroup = entry.getValue();
-      
-      logGroup.planRecovery();
-      
-      if (expectedTxId != HdfsConstants.INVALID_TXID && logStartTxId != expectedTxId) {
-        throw new IOException("Expected next log group would start at txid " +
-            expectedTxId + " but starts at txid " + logStartTxId);
-      }
-      
-      // We can pick any of the non-corrupt logs here
-      recoveryLogs.add(logGroup.getBestNonCorruptLog());
-      
-      // If this log group was finalized, we know to expect the next
-      // log group to start at the following txid (ie no gaps)
-      if (logGroup.hasKnownLastTxId()) {
-        expectedTxId = logGroup.getLastTxId() + 1;
-      } else {
-        // the log group was in-progress so we don't know what ID
-        // the next group should start from.
-        expectedTxId = HdfsConstants.INVALID_TXID;
-      }
-    }
-    
-    long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
-        0 : usefulGroups.lastKey();
-    if (maxSeenTxId > sinceTxId &&
-        maxSeenTxId > lastLogGroupStartTxId) {
-      String msg = "At least one storage directory indicated it has seen a " +
-        "log segment starting at txid " + maxSeenTxId;
-      if (usefulGroups.isEmpty()) {
-        msg += " but there are no logs to load.";
-      } else {
-        msg += " but the most recent log file found starts with txid " +
-          lastLogGroupStartTxId;
-      }
-      throw new IOException(msg);
-    }
-    
-    return new LogLoadPlan(recoveryLogs,
-        Lists.newArrayList(usefulGroups.values()));
-
-  }
-
   @Override
   public boolean needToSave() {
     return needToSave;
   }
-  
-  /**
-   * A group of logs that all start at the same txid.
-   * 
-   * Handles determining which logs are corrupt and which should be considered
-   * candidates for loading.
-   */
-  static class LogGroup {
-    long startTxId;
-    List<EditLogFile> logs = new ArrayList<EditLogFile>();;
-    private Set<Long> endTxIds = new TreeSet<Long>();
-    private boolean hasInProgress = false;
-    private boolean hasFinalized = false;
-        
-    LogGroup(long startTxId) {
-      this.startTxId = startTxId;
-    }
-    
-    EditLogFile getBestNonCorruptLog() {
-      // First look for non-corrupt finalized logs
-      for (EditLogFile log : logs) {
-        if (!log.isCorrupt() && !log.isInProgress()) {
-          return log;
-        }
-      }
-      // Then look for non-corrupt in-progress logs
-      for (EditLogFile log : logs) {
-        if (!log.isCorrupt()) {
-          return log;
-        }
-      }
 
-      // We should never get here, because we don't get to the planning stage
-      // without calling planRecovery first, and if we've called planRecovery,
-      // we would have already thrown if there were no non-corrupt logs!
-      throw new IllegalStateException(
-        "No non-corrupt logs for txid " + startTxId);
-    }
-
-    /**
-     * @return true if we can determine the last txid in this log group.
-     */
-    boolean hasKnownLastTxId() {
-      for (EditLogFile log : logs) {
-        if (!log.isInProgress()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /**
-     * @return the last txid included in the logs in this group
-     * @throws IllegalStateException if it is unknown -
-     *                               {@see #hasKnownLastTxId()}
-     */
-    long getLastTxId() {
-      for (EditLogFile log : logs) {
-        if (!log.isInProgress()) {
-          return log.getLastTxId();
-        }
-      }
-      throw new IllegalStateException("LogGroup only has in-progress logs");
-    }
-
-    
-    void add(EditLogFile log) {
-      assert log.getFirstTxId() == startTxId;
-      logs.add(log);
-      
-      if (log.isInProgress()) {
-        hasInProgress = true;
-      } else {
-        hasFinalized = true;
-        endTxIds.add(log.getLastTxId());
-      }
-    }
-    
-    void planRecovery() throws IOException {
-      assert hasInProgress || hasFinalized;
-      
-      checkConsistentEndTxIds();
-        
-      if (hasFinalized && hasInProgress) {
-        planMixedLogRecovery();
-      } else if (!hasFinalized && hasInProgress) {
-        planAllInProgressRecovery();
-      } else if (hasFinalized && !hasInProgress) {
-        LOG.debug("No recovery necessary for logs starting at txid " +
-                  startTxId);
-      }
-    }
-
-    /**
-     * Recovery case for when some logs in the group were in-progress, and
-     * others were finalized. This happens when one of the storage
-     * directories fails.
-     *
-     * The in-progress logs in this case should be considered corrupt.
-     */
-    private void planMixedLogRecovery() throws IOException {
-      for (EditLogFile log : logs) {
-        if (log.isInProgress()) {
-          LOG.warn("Log at " + log.getFile() + " is in progress, but " +
-                   "other logs starting at the same txid " + startTxId +
-                   " are finalized. Moving aside.");
-          log.markCorrupt();
-        }
-      }
-    }
-    
-    /**
-     * Recovery case for when all of the logs in the group were in progress.
-     * This happens if the NN completely crashes and restarts. In this case
-     * we check the non-zero lengths of each log file, and any logs that are
-     * less than the max of these lengths are considered corrupt.
-     */
-    private void planAllInProgressRecovery() throws IOException {
-      // We only have in-progress logs. We need to figure out which logs have
-      // the latest data to reccover them
-      LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
-               "in-progress (probably truncated due to a previous NameNode " +
-               "crash)");
-      if (logs.size() == 1) {
-        // Only one log, it's our only choice!
-        EditLogFile log = logs.get(0);
-        if (log.validateLog().numTransactions == 0) {
-          // If it has no transactions, we should consider it corrupt just
-          // to be conservative.
-          // See comment below for similar case
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it has no transactions in it.");
-          log.markCorrupt();          
-        }
-        return;
-      }
-
-      long maxValidTxnCount = Long.MIN_VALUE;
-      for (EditLogFile log : logs) {
-        long validTxnCount = log.validateLog().numTransactions;
-        LOG.warn("  Log " + log.getFile() +
-            " valid txns=" + validTxnCount +
-            " valid len=" + log.validateLog().validLength);
-        maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
-      }        
-
-      for (EditLogFile log : logs) {
-        long txns = log.validateLog().numTransactions;
-        if (txns < maxValidTxnCount) {
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-                   "it is has only " + txns + " valid txns whereas another " +
-                   "log has " + maxValidTxnCount);
-          log.markCorrupt();
-        } else if (txns == 0) {
-          // this can happen if the NN crashes right after rolling a log
-          // but before the START_LOG_SEGMENT txn is written. Since the log
-          // is empty, we can just move it aside to its corrupt name.
-          LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
-              "it has no transactions in it.");
-          log.markCorrupt();
-        }
-      }
-    }
-
-    /**
-     * Check for the case when we have multiple finalized logs and they have
-     * different ending transaction IDs. This violates an invariant that all
-     * log directories should roll together. We should abort in this case.
-     */
-    private void checkConsistentEndTxIds() throws IOException {
-      if (hasFinalized && endTxIds.size() > 1) {
-        throw new IOException("More than one ending txid was found " +
-            "for logs starting at txid " + startTxId + ". " +
-            "Found: " + StringUtils.join(endTxIds, ','));
-      }
-    }
-
-    void recover() throws IOException {
-      for (EditLogFile log : logs) {
-        if (log.isCorrupt()) {
-          log.moveAsideCorruptFile();
-        } else if (log.isInProgress()) {
-          log.finalizeLog();
-        }
-      }
-    }    
-  }
-  
-  static class TransactionalLoadPlan extends LoadPlan {
-    final FSImageFile image;
-    final LogLoadPlan logPlan;
-    
-    public TransactionalLoadPlan(FSImageFile image,
-        LogLoadPlan logPlan) {
-      super();
-      this.image = image;
-      this.logPlan = logPlan;
-    }
-
-    @Override
-    boolean doRecovery() throws IOException {
-      logPlan.doRecovery();
-      return false;
-    }
-
-    @Override
-    File getImageFile() {
-      return image.getFile();
-    }
-
-    @Override
-    List<File> getEditsFiles() {
-      return logPlan.getEditsFiles();
-    }
-
-    @Override
-    StorageDirectory getStorageDirectoryForProperties() {
-      return image.sd;
-    }
-  }
-  
-  static class LogLoadPlan {
-    final List<EditLogFile> editLogs;
-    final List<LogGroup> logGroupsToRecover;
-    
-    LogLoadPlan(List<EditLogFile> editLogs,
-        List<LogGroup> logGroupsToRecover) {
-      this.editLogs = editLogs;
-      this.logGroupsToRecover = logGroupsToRecover;
-    }
-
-    public void doRecovery() throws IOException {
-      for (LogGroup g : logGroupsToRecover) {
-        g.recover();
-      }
-    }
-
-    public List<File> getEditsFiles() {
-      List<File> ret = new ArrayList<File>();
-      for (EditLogFile log : editLogs) {
-        ret.add(log.getFile());
-      }
-      return ret;
-    }
+  @Override
+  long getMaxSeenTxId() {
+    return maxSeenTxId;
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Sat Jan  7 16:28:27 2012
@@ -23,11 +23,14 @@ import org.apache.commons.logging.LogFac
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Comparator;
+import java.util.Collections;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
@@ -57,6 +60,9 @@ class FileJournalManager implements Jour
   private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
     NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
 
+  private File currentInProgress = null;
+  private long maxSeenTransaction = 0L;
+
   @VisibleForTesting
   StoragePurger purger
     = new NNStorageRetentionManager.DeletionStoragePurger();
@@ -66,19 +72,20 @@ class FileJournalManager implements Jour
   }
 
   @Override
-  public EditLogOutputStream startLogSegment(long txid) throws IOException {    
-    File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
-    EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
+  synchronized public EditLogOutputStream startLogSegment(long txid) 
+      throws IOException {
+    currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
+    EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
         outputBufferCapacity);
     stm.create();
     return stm;
   }
 
   @Override
-  public void finalizeLogSegment(long firstTxId, long lastTxId)
+  synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
       throws IOException {
-    File inprogressFile = NNStorage.getInProgressEditsFile(
-        sd, firstTxId);
+    File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
+
     File dstFile = NNStorage.getFinalizedEditsFile(
         sd, firstTxId, lastTxId);
     LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
@@ -89,6 +96,9 @@ class FileJournalManager implements Jour
     if (!inprogressFile.renameTo(dstFile)) {
       throw new IOException("Unable to finalize edits file " + inprogressFile);
     }
+    if (inprogressFile.equals(currentInProgress)) {
+      currentInProgress = null;
+    }
   }
 
   @VisibleForTesting
@@ -97,12 +107,7 @@ class FileJournalManager implements Jour
   }
 
   @Override
-  public String toString() {
-    return "FileJournalManager for storage directory " + sd;
-  }
-
-  @Override
-  public void setOutputBufferCapacity(int size) {
+  synchronized public void setOutputBufferCapacity(int size) {
     this.outputBufferCapacity = size;
   }
 
@@ -120,13 +125,6 @@ class FileJournalManager implements Jour
     }
   }
 
-  @Override
-  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
-      throws IOException {
-    File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
-    return new EditLogFileInputStream(f);
-  }
-  
   /**
    * Find all editlog segments starting at or above the given txid.
    * @param fromTxId the txnid which to start looking
@@ -178,17 +176,156 @@ class FileJournalManager implements Jour
         try {
           long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
           ret.add(
-            new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END));
+              new EditLogFile(f, startTxId, startTxId, true));
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
           // skip
-        }          
+        }
       }
     }
     return ret;
   }
 
+  @Override
+  synchronized public EditLogInputStream getInputStream(long fromTxId) 
+      throws IOException {
+    for (EditLogFile elf : getLogFiles(fromTxId)) {
+      if (elf.getFirstTxId() == fromTxId) {
+        if (elf.isInProgress()) {
+          elf.validateLog();
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Returning edit stream reading from " + elf);
+        }
+        return new EditLogFileInputStream(elf.getFile(), 
+            elf.getFirstTxId(), elf.getLastTxId());
+      }
+    }
+
+    throw new IOException("Cannot find editlog file with " + fromTxId
+        + " as first first txid");
+  }
+
+  @Override
+  public long getNumberOfTransactions(long fromTxId) 
+      throws IOException, CorruptionException {
+    long numTxns = 0L;
+    
+    for (EditLogFile elf : getLogFiles(fromTxId)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Counting " + elf);
+      }
+      if (elf.getFirstTxId() > fromTxId) { // there must be a gap
+        LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+            + fromTxId + " - " + (elf.getFirstTxId() - 1));
+        break;
+      } else if (fromTxId == elf.getFirstTxId()) {
+        if (elf.isInProgress()) {
+          elf.validateLog();
+        } 
+
+        if (elf.isCorrupt()) {
+          break;
+        }
+        fromTxId = elf.getLastTxId() + 1;
+        numTxns += fromTxId - elf.getFirstTxId();
+        
+        if (elf.isInProgress()) {
+          break;
+        }
+      } // else skip
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Journal " + this + " has " + numTxns 
+                + " txns from " + fromTxId);
+    }
+
+    long max = findMaxTransaction();
+    // fromTxId should be greater than max, as it points to the next 
+    // transaction we should expect to find. If it is less than or equal
+    // to max, it means that a transaction with txid == max has not been found
+    if (numTxns == 0 && fromTxId <= max) { 
+      String error = String.format("Gap in transactions, max txnid is %d"
+                                   + ", 0 txns from %d", max, fromTxId);
+      LOG.error(error);
+      throw new CorruptionException(error);
+    }
+
+    return numTxns;
+  }
+
+  @Override
+  synchronized public void recoverUnfinalizedSegments() throws IOException {
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    
+    // make sure journal is aware of max seen transaction before moving corrupt 
+    // files aside
+    findMaxTransaction();
+
+    for (EditLogFile elf : allLogFiles) {
+      if (elf.getFile().equals(currentInProgress)) {
+        continue;
+      }
+      if (elf.isInProgress()) {
+        elf.validateLog();
+
+        if (elf.isCorrupt()) {
+          elf.moveAsideCorruptFile();
+          continue;
+        }
+        finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
+      }
+    }
+  }
+
+  private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
+    List<EditLogFile> logFiles = Lists.newArrayList();
+    
+    for (EditLogFile elf : allLogFiles) {
+      if (fromTxId > elf.getFirstTxId()
+          && fromTxId <= elf.getLastTxId()) {
+        throw new IOException("Asked for fromTxId " + fromTxId
+            + " which is in middle of file " + elf.file);
+      }
+      if (fromTxId <= elf.getFirstTxId()) {
+        logFiles.add(elf);
+      }
+    }
+    
+    Collections.sort(logFiles, EditLogFile.COMPARE_BY_START_TXID);
+
+    return logFiles;
+  }
+
+  /** 
+   * Find the maximum transaction in the journal.
+   * This gets stored in a member variable, as corrupt edit logs
+   * will be moved aside, but we still need to remember their first
+   * tranaction id in the case that it was the maximum transaction in
+   * the journal.
+   */
+  private long findMaxTransaction()
+      throws IOException {
+    for (EditLogFile elf : getLogFiles(0)) {
+      if (elf.isInProgress()) {
+        maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
+        elf.validateLog();
+      }
+      maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
+    }
+    return maxSeenTransaction;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FileJournalManager(root=%s)", sd.getRoot());
+  }
+
   /**
    * Record of an edit log that has been located and had its filename parsed.
    */
@@ -196,12 +333,10 @@ class FileJournalManager implements Jour
     private File file;
     private final long firstTxId;
     private long lastTxId;
-    
-    private EditLogValidation cachedValidation = null;
+
     private boolean isCorrupt = false;
-    
-    static final long UNKNOWN_END = -1;
-    
+    private final boolean isInProgress;
+
     final static Comparator<EditLogFile> COMPARE_BY_START_TXID 
       = new Comparator<EditLogFile>() {
       public int compare(EditLogFile a, EditLogFile b) {
@@ -214,30 +349,24 @@ class FileJournalManager implements Jour
 
     EditLogFile(File file,
         long firstTxId, long lastTxId) {
-      assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId;
-      assert firstTxId > 0;
+      this(file, firstTxId, lastTxId, false);
+      assert (lastTxId != HdfsConstants.INVALID_TXID)
+        && (lastTxId >= firstTxId);
+    }
+    
+    EditLogFile(File file, long firstTxId, 
+                long lastTxId, boolean isInProgress) { 
+      assert (lastTxId == HdfsConstants.INVALID_TXID && isInProgress)
+        || (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId);
+      assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
       assert file != null;
       
       this.firstTxId = firstTxId;
       this.lastTxId = lastTxId;
       this.file = file;
+      this.isInProgress = isInProgress;
     }
     
-    public void finalizeLog() throws IOException {
-      long numTransactions = validateLog().numTransactions;
-      long lastTxId = firstTxId + numTransactions - 1;
-      File dst = new File(file.getParentFile(),
-          NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId));
-      LOG.info("Finalizing edits log " + file + " by renaming to "
-          + dst.getName());
-      if (!file.renameTo(dst)) {
-        throw new IOException("Couldn't finalize log " +
-            file + " to " + dst);
-      }
-      this.lastTxId = lastTxId;
-      file = dst;
-    }
-
     long getFirstTxId() {
       return firstTxId;
     }
@@ -246,15 +375,22 @@ class FileJournalManager implements Jour
       return lastTxId;
     }
 
-    EditLogValidation validateLog() throws IOException {
-      if (cachedValidation == null) {
-        cachedValidation = EditLogFileInputStream.validateEditLog(file);
+    /** 
+     * Count the number of valid transactions in a log.
+     * This will update the lastTxId of the EditLogFile or
+     * mark it as corrupt if it is.
+     */
+    void validateLog() throws IOException {
+      EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
+      if (val.getNumTransactions() == 0) {
+        markCorrupt();
+      } else {
+        this.lastTxId = val.getEndTxId();
       }
-      return cachedValidation;
     }
 
     boolean isInProgress() {
-      return (lastTxId == UNKNOWN_END);
+      return isInProgress;
     }
 
     File getFile() {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Sat Jan  7 16:28:27 2012
@@ -39,6 +39,25 @@ interface JournalManager {
    */
   void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
 
+   /**
+   * Get the input stream starting with fromTxnId from this journal manager
+   * @param fromTxnId the first transaction id we want to read
+   * @return the stream starting with transaction fromTxnId
+   * @throws IOException if a stream cannot be found.
+   */
+  EditLogInputStream getInputStream(long fromTxnId) throws IOException;
+
+  /**
+   * Get the number of transaction contiguously available from fromTxnId.
+   *
+   * @param fromTxnId Transaction id to count from
+   * @return The number of transactions available from fromTxnId
+   * @throws IOException if the journal cannot be read.
+   * @throws CorruptionException if there is a gap in the journal at fromTxnId.
+   */
+  long getNumberOfTransactions(long fromTxnId) 
+      throws IOException, CorruptionException;
+
   /**
    * Set the amount of memory that this stream should use to buffer edits
    */
@@ -57,10 +76,21 @@ interface JournalManager {
     throws IOException;
 
   /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. May return null if this journal
-   * manager does not support this operation.
-   */  
-  EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
-    throws IOException;
+   * Recover segments which have not been finalized.
+   */
+  void recoverUnfinalizedSegments() throws IOException;
+
+  /** 
+   * Indicate that a journal is cannot be used to load a certain range of 
+   * edits.
+   * This exception occurs in the case of a gap in the transactions, or a
+   * corrupt edit file.
+   */
+  public static class CorruptionException extends IOException {
+    static final long serialVersionUID = -4687802717006172702L;
+    
+    public CorruptionException(String reason) {
+      super(reason);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java Sat Jan  7 16:28:27 2012
@@ -248,7 +248,7 @@ public class TestDFSRollback extends Tes
       baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       deleteMatchingFiles(baseDirs, "edits.*");
       startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "but there are no logs to load");
+          "No non-corrupt logs for txid ");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with no image file", numDirs);

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java Sat Jan  7 16:28:27 2012
@@ -392,12 +392,9 @@ public abstract class FSImageTestUtil {
    */
   public static EditLogFile findLatestEditsLog(StorageDirectory sd)
   throws IOException {
-    FSImageTransactionalStorageInspector inspector =
-      new FSImageTransactionalStorageInspector();
-    inspector.inspectDirectory(sd);
-    
-    List<EditLogFile> foundEditLogs = Lists.newArrayList(
-        inspector.getEditLogFiles());
+    File currentDir = sd.getCurrentDir();
+    List<EditLogFile> foundEditLogs 
+      = Lists.newArrayList(FileJournalManager.matchEditLogs(currentDir.listFiles()));
     return Collections.max(foundEditLogs, EditLogFile.COMPARE_BY_START_TXID);
   }
 

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java?rev=1228651&r1=1228650&r2=1228651&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java Sat Jan  7 16:28:27 2012
@@ -84,8 +84,10 @@ public class TestCheckPointForSecurityTo
       for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
         EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
         assertTrue(log.isInProgress());
+        log.validateLog();
+        long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
         assertEquals("In-progress log " + log + " should have 5 transactions",
-            5, log.validateLog().numTransactions);
+                     5, numTransactions);;
       }
 
       // Saving image in safe mode should succeed
@@ -99,8 +101,10 @@ public class TestCheckPointForSecurityTo
       for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
         EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
         assertTrue(log.isInProgress());
+        log.validateLog();
+        long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
         assertEquals("In-progress log " + log + " should only have START txn",
-            1, log.validateLog().numTransactions);
+            1, numTransactions);
       }
 
       // restart cluster



Mime
View raw message