hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1152295 [2/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/j...
Date Fri, 29 Jul 2011 16:28:51 GMT
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Jul 29 16:28:45 2011
@@ -19,10 +19,11 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.RPC;
@@ -39,7 +40,7 @@ import org.apache.hadoop.net.NetUtils;
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
 
-  private NamenodeProtocol backupNode;          // RPC proxy to backup node
+  private JournalProtocol backupNode;        // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
   private EditsDoubleBuffer doubleBuf;
@@ -56,8 +57,8 @@ class EditLogBackupOutputStream extends 
     Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
-        (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration());
+        RPC.getProxy(JournalProtocol.class,
+            JournalProtocol.versionID, bnAddress, new HdfsConfiguration());
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
@@ -65,7 +66,7 @@ class EditLogBackupOutputStream extends 
     this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
-
+  
   @Override // JournalStream
   public String getName() {
     return bnRegistration.getAddress();
@@ -109,6 +110,12 @@ class EditLogBackupOutputStream extends 
     doubleBuf = null;
   }
 
+  @Override
+  public void abort() throws IOException {
+    RPC.stopProxy(backupNode);
+    doubleBuf = null;
+  }
+
   @Override // EditLogOutputStream
   void setReadyToFlush() throws IOException {
     doubleBuf.setReadyToFlush();
@@ -116,11 +123,21 @@ class EditLogBackupOutputStream extends 
 
   @Override // EditLogOutputStream
   protected void flushAndSync() throws IOException {
-    // XXX: this code won't work in trunk, but it's redone
-    // in HDFS-1073 where it's simpler.
+    assert out.getLength() == 0 : "Output buffer is not empty";
+    
+    int numReadyTxns = doubleBuf.countReadyTxns();
+    long firstTxToFlush = doubleBuf.getFirstReadyTxId();
+    
     doubleBuf.flushTo(out);
-    if (out.size() > 0) {
-      send(NamenodeProtocol.JA_JOURNAL);
+    if (out.getLength() > 0) {
+      assert numReadyTxns > 0;
+      
+      byte[] data = Arrays.copyOf(out.getData(), out.getLength());
+      out.reset();
+      assert out.getLength() == 0 : "Output buffer is not empty";
+
+      backupNode.journal(nnRegistration,
+          firstTxToFlush, numReadyTxns, data);
     }
   }
 
@@ -134,16 +151,6 @@ class EditLogBackupOutputStream extends 
     return 0;
   }
 
-  private void send(int ja) throws IOException {
-    try {
-      int length = out.getLength();
-      out.write(FSEditLogOpCodes.OP_INVALID.getOpCode());
-      backupNode.journal(nnRegistration, ja, length, out.getData());
-    } finally {
-      out.reset();
-    }
-  }
-
   /**
    * Get backup node registration.
    */
@@ -151,17 +158,7 @@ class EditLogBackupOutputStream extends 
     return bnRegistration;
   }
 
-  /**
-   * Verify that the backup node is alive.
-   */
-  boolean isAlive() {
-    try {
-      send(NamenodeProtocol.JA_IS_ALIVE);
-    } catch(IOException ei) {
-      Storage.LOG.info(bnRegistration.getRole() + " "
-                      + bnRegistration.getAddress() + " is not alive. ", ei);
-      return false;
-    }
-    return true;
+  void startLogSegment(long txId) throws IOException {
+    backupNode.startLogSegment(nnRegistration, txId);
   }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Fri Jul 29 16:28:45 2011
@@ -70,4 +70,10 @@ class EditLogFileInputStream extends Edi
     // file size + size of both buffers
     return file.length();
   }
+  
+  @Override
+  public String toString() {
+    return getName();
+  }
+
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Jul 29 16:28:45 2011
@@ -25,6 +25,8 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.IOUtils;
 
@@ -35,6 +37,8 @@ import com.google.common.annotations.Vis
  * stores edits in a local file.
  */
 class EditLogFileOutputStream extends EditLogOutputStream {
+  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
+
   private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
 
   private File file;
@@ -85,7 +89,14 @@ class EditLogFileOutputStream extends Ed
     doubleBuf.writeOp(op);
   }
 
-  /** {@inheritDoc} */
+  /**
+   * Write a transaction to the stream. The serialization format is:
+   * <ul>
+   *   <li>the opcode (byte)</li>
+   *   <li>the transaction id (long)</li>
+   *   <li>the actual Writables for the transaction</li>
+   * </ul>
+   * */
   @Override
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
     doubleBuf.writeRaw(bytes, offset, length);
@@ -105,6 +116,10 @@ class EditLogFileOutputStream extends Ed
 
   @Override
   public void close() throws IOException {
+    if (fp == null) {
+      throw new IOException("Trying to use aborted output stream");
+    }
+
     try {
       // close should have been called after all pending transactions
       // have been flushed & synced.
@@ -130,6 +145,16 @@ class EditLogFileOutputStream extends Ed
       fc = null;
       fp = null;
     }
+    fp = null;
+  }
+  
+  @Override
+  public void abort() throws IOException {
+    if (fp == null) {
+      return;
+    }
+    IOUtils.cleanup(LOG, fp);
+    fp = null;
   }
 
   /**
@@ -148,6 +173,10 @@ class EditLogFileOutputStream extends Ed
    */
   @Override
   protected void flushAndSync() throws IOException {
+    if (fp == null) {
+      throw new IOException("Trying to use aborted output stream");
+    }
+    
     preallocate(); // preallocate file if necessary
     doubleBuf.flushTo(fp);
     fc.force(false); // metadata updates not needed because of preallocation
@@ -190,19 +219,17 @@ class EditLogFileOutputStream extends Ed
   }
 
   /**
-   * Operations like OP_JSPOOL_START and OP_CHECKPOINT_TIME should not be
-   * written into edits file.
+   * Returns the file associated with this stream.
    */
-  @Override
-  boolean isOperationSupported(byte op) {
-    return op < FSEditLogOpCodes.OP_JSPOOL_START.getOpCode() - 1;
+  File getFile() {
+    return file;
   }
 
   /**
-   * Returns the file associated with this stream.
+   * @return true if this stream is currently open.
    */
-  File getFile() {
-    return file;
+  public boolean isOpen() {
+    return fp != null;
   }
   
   @VisibleForTesting

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Jul 29 16:28:45 2011
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.zip.Checksum;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+
 /**
  * A generic abstract class to support journaling of edits logs into 
  * a persistent storage.
@@ -63,10 +67,20 @@ abstract class EditLogOutputStream imple
    */
   abstract void create() throws IOException;
 
-  /** {@inheritDoc} */
+  /**
+   * Close the journal.
+   * @throws IOException if the journal can't be closed,
+   *         or if there are unflushed edits
+   */
   abstract public void close() throws IOException;
 
   /**
+   * Close the stream without necessarily flushing any pending data.
+   * This may be called after a previous write or close threw an exception.
+   */
+  abstract public void abort() throws IOException;
+  
+  /**
    * All data that has been written to the stream so far will be flushed.
    * New data can be still written to the stream while flushing is performed.
    */
@@ -108,10 +122,6 @@ abstract class EditLogOutputStream imple
     return false;
   }
   
-  boolean isOperationSupported(byte op) {
-    return true;
-  }
-
   /**
    * Return total time spent in {@link #flushAndSync()}
    */

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Jul 29 16:28:45 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
@@ -35,20 +36,19 @@ import com.google.common.base.Preconditi
  */
 class EditsDoubleBuffer {
 
-  private DataOutputBuffer bufCurrent; // current buffer for writing
-  private DataOutputBuffer bufReady; // buffer ready for flushing
+  private TxnBuffer bufCurrent; // current buffer for writing
+  private TxnBuffer bufReady; // buffer ready for flushing
   private final int initBufferSize;
-  private Writer writer;
 
   public EditsDoubleBuffer(int defaultBufferSize) {
     initBufferSize = defaultBufferSize;
-    bufCurrent = new DataOutputBuffer(initBufferSize);
-    bufReady = new DataOutputBuffer(initBufferSize);
-    writer = new FSEditLogOp.Writer(bufCurrent);
+    bufCurrent = new TxnBuffer(initBufferSize);
+    bufReady = new TxnBuffer(initBufferSize);
+
   }
     
   public void writeOp(FSEditLogOp op) throws IOException {
-    writer.writeOp(op);
+    bufCurrent.writeOp(op);
   }
 
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
@@ -71,10 +71,9 @@ class EditsDoubleBuffer {
   
   void setReadyToFlush() {
     assert isFlushed() : "previous data not flushed yet";
-    DataOutputBuffer tmp = bufReady;
+    TxnBuffer tmp = bufReady;
     bufReady = bufCurrent;
     bufCurrent = tmp;
-    writer = new FSEditLogOp.Writer(bufCurrent);
   }
   
   /**
@@ -102,4 +101,50 @@ class EditsDoubleBuffer {
     return bufReady.size() + bufCurrent.size();
   }
 
+  /**
+   * @return the transaction ID of the first transaction ready to be flushed 
+   */
+  public long getFirstReadyTxId() {
+    assert bufReady.firstTxId > 0;
+    return bufReady.firstTxId;
+  }
+
+  /**
+   * @return the number of transactions that are ready to be flushed
+   */
+  public int countReadyTxns() {
+    return bufReady.numTxns;
+  }
+
+  
+  private static class TxnBuffer extends DataOutputBuffer {
+    long firstTxId;
+    int numTxns;
+    private Writer writer;
+    
+    public TxnBuffer(int initBufferSize) {
+      super(initBufferSize);
+      writer = new FSEditLogOp.Writer(this);
+      reset();
+    }
+
+    public void writeOp(FSEditLogOp op) throws IOException {
+      if (firstTxId == FSConstants.INVALID_TXID) {
+        firstTxId = op.txid;
+      } else {
+        assert op.txid > firstTxId;
+      }
+      writer.writeOp(op);
+      numTxns++;
+    }
+    
+    @Override
+    public DataOutputBuffer reset() {
+      super.reset();
+      firstTxId = FSConstants.INVALID_TXID;
+      numTxns = 0;
+      return this;
+    }
+  }
+
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jul 29 16:28:45 2011
@@ -22,9 +22,7 @@ import static org.apache.hadoop.hdfs.ser
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -157,26 +155,33 @@ public class FSDirectory implements Clos
     return getFSNamesystem().getBlockManager();
   }
 
-  void loadFSImage(Collection<URI> dataDirs,
-                   Collection<URI> editsDirs,
-                   StartupOption startOpt) 
+  /**
+   * Load the filesystem image into memory.
+   *
+   * @param startOpt Startup type as specified by the user.
+   * @throws IOException If image or editlog cannot be read.
+   */
+  void loadFSImage(StartupOption startOpt) 
       throws IOException {
     // format before starting up if requested
     if (startOpt == StartupOption.FORMAT) {
-      fsImage.getStorage().setStorageDirectories(dataDirs, editsDirs);
-      fsImage.getStorage().format(fsImage.getStorage().determineClusterId()); // reuse current id
+      fsImage.format(fsImage.getStorage().determineClusterId());// reuse current id
+
       startOpt = StartupOption.REGULAR;
     }
+    boolean success = false;
     try {
-      if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
-        fsImage.saveNamespace(true);
+      if (fsImage.recoverTransitionRead(startOpt)) {
+        fsImage.saveNamespace();
       }
-      FSEditLog editLog = fsImage.getEditLog();
-      assert editLog != null : "editLog must be initialized";
+      fsImage.openEditLog();
+      
       fsImage.setCheckpointDirectories(null, null);
-    } catch(IOException e) {
-      fsImage.close();
-      throw e;
+      success = true;
+    } finally {
+      if (!success) {
+        fsImage.close();
+      }
     }
     writeLock();
     try {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 29 16:28:45 2011
@@ -17,10 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.zip.Checksum;
@@ -33,22 +30,26 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
-import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.PureJavaCrc32;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
@@ -58,23 +59,42 @@ import org.apache.hadoop.hdfs.server.nam
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class FSEditLog implements NNStorageListener {
+public class FSEditLog  {
 
   static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
       " File system changes are not persistent. No journal streams.";
 
-  private static final Log LOG = LogFactory.getLog(FSEditLog.class);
+  static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
-  private volatile int sizeOutputFlushBuffer = 512*1024;
+  /**
+   * State machine for edit log.
+   * The log starts in UNITIALIZED state upon construction. Once it's
+   * initialized, it is usually in IN_SEGMENT state, indicating that edits
+   * may be written. In the middle of a roll, or while saving the namespace,
+   * it briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the
+   * previous segment has been closed, but the new one has not yet been opened.
+   */
+  private enum State {
+    UNINITIALIZED,
+    BETWEEN_LOG_SEGMENTS,
+    IN_SEGMENT,
+    CLOSED;
+  }  
+  private State state = State.UNINITIALIZED;
 
-  private ArrayList<EditLogOutputStream> editStreams = null;
 
+  private List<JournalAndStream> journals = Lists.newArrayList();
+    
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
 
   // stores the last synced transactionId.
   private long synctxid = 0;
 
+  // the first txid of the log that's currently open for writing.
+  // If this value is N, we are currently writing to edits_inprogress_N
+  private long curSegmentTxId = FSConstants.INVALID_TXID;
+
   // the time of printing the statistics to the log file.
   private long lastPrintTime;
 
@@ -83,6 +103,10 @@ public class FSEditLog implements NNStor
 
   // is an automatic sync scheduled?
   private volatile boolean isAutoSyncScheduled = false;
+  
+  // Used to exit in the event of a failure to sync to all journals. It's a
+  // member variable so it can be swapped out for testing.
+  private Runtime runtime = Runtime.getRuntime();
 
   // these are statistics counters.
   private long numTransactions;        // number of transactions
@@ -122,226 +146,90 @@ public class FSEditLog implements NNStor
   FSEditLog(NNStorage storage) {
     isSyncRunning = false;
     this.storage = storage;
-    this.storage.registerListener(this);
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
   }
   
-  private File getEditFile(StorageDirectory sd) {
-    return storage.getEditFile(sd);
-  }
-  
-  private File getEditNewFile(StorageDirectory sd) {
-    return storage.getEditNewFile(sd);
-  }
-  
-  private int getNumEditsDirs() {
-   return storage.getNumStorageDirs(NameNodeDirType.EDITS);
-  }
-
-  synchronized int getNumEditStreams() {
-    return editStreams == null ? 0 : editStreams.size();
-  }
-
   /**
-   * Return the currently active edit streams.
-   * This should be used only by unit tests.
+   * Initialize the list of edit journals
    */
-  ArrayList<EditLogOutputStream> getEditStreams() {
-    return editStreams;
-  }
-
-  boolean isOpen() {
-    return getNumEditStreams() > 0;
-  }
-
-  /**
-   * Create empty edit log files.
-   * Initialize the output stream for logging.
-   * 
-   * @throws IOException
-   */
-  synchronized void open() throws IOException {
-    numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
-    if (editStreams == null)
-      editStreams = new ArrayList<EditLogOutputStream>();
+  synchronized void initJournals() {
+    assert journals.isEmpty();
+    Preconditions.checkState(state == State.UNINITIALIZED,
+        "Bad state: %s", state);
     
-    ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it 
-         = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File eFile = getEditFile(sd);
-      try {
-        addNewEditLogStream(eFile);
-      } catch (IOException e) {
-        LOG.warn("Unable to open edit log file " + eFile);
-        // Remove the directory from list of storage directories
-        if(al == null) al = new ArrayList<StorageDirectory>(1);
-        al.add(sd);
-      }
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+      journals.add(new JournalAndStream(new FileJournalManager(sd)));
     }
     
-    if (al != null)
-      storage.reportErrorsOnDirectories(al);
-    
-    // If there was an error in every storage dir, each one will have
-    // been removed from the list of storage directories.
-    if (storage.getNumStorageDirs(NameNodeDirType.EDITS) == 0) {
-      throw new IOException(
-          "Failed to initialize edits log in any storage directory.");
+    if (journals.isEmpty()) {
+      LOG.error("No edits directories configured!");
     }
+    
+    state = State.BETWEEN_LOG_SEGMENTS;
   }
   
-  
-  synchronized void addNewEditLogStream(File eFile) throws IOException {
-    EditLogOutputStream eStream = new EditLogFileOutputStream(eFile,
-        sizeOutputFlushBuffer);
-    editStreams.add(eStream);
-  }
-
-  synchronized void createEditLogFile(File name) throws IOException {
-    waitForSyncToFinish();
-
-    EditLogOutputStream eStream = new EditLogFileOutputStream(name,
-        sizeOutputFlushBuffer);
-    eStream.create();
-    eStream.close();
-  }
-
   /**
-   * Shutdown the file store.
+   * Initialize the output stream for logging, opening the first
+   * log segment.
    */
-  synchronized void close() {
-    waitForSyncToFinish();
-    if (editStreams == null || editStreams.isEmpty()) {
-      return;
-    }
-    printStatistics(true);
-    numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
-
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    Iterator<EditLogOutputStream> it = getOutputStreamIterator(null);
-    while(it.hasNext()) {
-      EditLogOutputStream eStream = it.next();
-      try {
-        closeStream(eStream);
-      } catch (IOException e) {
-        LOG.warn("FSEditLog:close - failed to close stream " 
-            + eStream.getName());
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
-    editStreams.clear();
-  }
+  synchronized void open() throws IOException {
+    Preconditions.checkState(state == State.UNINITIALIZED);
+    initJournals();
 
-  /**
-   * Close and remove edit log stream.
-   * @param index of the stream
-   */
-  synchronized private void removeStream(int index) {
-    EditLogOutputStream eStream = editStreams.get(index);
-    try {
-      eStream.close();
-    } catch (Exception e) {}
-    editStreams.remove(index);
+    startLogSegment(getLastWrittenTxId() + 1, true);
+    assert state == State.IN_SEGMENT : "Bad state: " + state;
   }
-
-  /**
-   * The specified streams have IO errors. Close and remove them.
-   */
-  synchronized
-  void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
-    if (errorStreams == null || errorStreams.size() == 0) {
-      return;                       // nothing to do
-    }
-    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
-    for (EditLogOutputStream e : errorStreams) {
-      if (e.getType() == JournalType.FILE) {
-        errorDirs.add(getStorageDirectoryForStream(e));
-      } else {
-        disableStream(e);
-      }
-    }
-
-    try {
-      storage.reportErrorsOnDirectories(errorDirs);
-    } catch (IOException ioe) {
-      LOG.error("Problem erroring streams " + ioe);
-    }
+  
+  synchronized boolean isOpen() {
+    return state == State.IN_SEGMENT;
   }
 
-
   /**
-   * get an editStream corresponding to a sd
-   * @param es - stream to remove
-   * @return the matching stream
+   * Shutdown the file store.
    */
-  StorageDirectory getStorage(EditLogOutputStream es) {
-    String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
-    .getParentFile().getParentFile().getAbsolutePath();
-
-    Iterator<StorageDirectory> it = storage.dirIterator(); 
-    while (it.hasNext()) {
-      StorageDirectory sd = it.next();
-      LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath()); 
-      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
-        return sd;
+  synchronized void close() {
+    if (state == State.CLOSED) {
+      LOG.warn("Closing log when already closed", new Exception());
+      return;
     }
-    return null;
-  }
-  
-  /**
-   * get an editStream corresponding to a sd
-   * @param sd
-   * @return the matching stream
-   */
-  synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
-    for (EditLogOutputStream es : editStreams) {
-      File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
-        .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
-        return es;
+    
+    if (state == State.IN_SEGMENT) {
+      assert !journals.isEmpty();
+      waitForSyncToFinish();
+      endCurrentLogSegment(true);
     }
-    return null;
-  }
 
-  /**
-   * check if edits.new log exists in the specified stoorage directory
-   */
-  boolean existsNew(StorageDirectory sd) {
-    return getEditNewFile(sd).exists(); 
+    state = State.CLOSED;
   }
 
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  void logEdit(FSEditLogOp op) {
+  void logEdit(final FSEditLogOp op) {
     synchronized (this) {
+      assert state != State.CLOSED;
+      
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if(getNumEditStreams() == 0)
+      if (journals.isEmpty()) {
         throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      ArrayList<EditLogOutputStream> errorStreams = null;
-      long start = now();
-      for(EditLogOutputStream eStream : editStreams) {
-        if(!eStream.isOperationSupported(op.opCode.getOpCode()))
-          continue;
-        try {
-          eStream.write(op);
-        } catch (IOException ie) {
-          LOG.error("logEdit: removing "+ eStream.getName(), ie);
-          if(errorStreams == null)
-            errorStreams = new ArrayList<EditLogOutputStream>(1);
-          errorStreams.add(eStream);
-        }
       }
-      disableAndReportErrorOnStreams(errorStreams);
-      recordTransaction(start);
+      
+      long start = beginTransaction();
+      op.setTransactionId(txid);
+
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override 
+        public void apply(JournalAndStream jas) throws IOException {
+          if (!jas.isActive()) return;
+          jas.stream.write(op);
+        }
+      }, "logging edit");
+
+      endTransaction(start);
       
       // check if it is time to schedule an automatic sync
       if (!shouldForceSync()) {
@@ -384,15 +272,18 @@ public class FSEditLog implements NNStor
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (EditLogOutputStream eStream : editStreams) {
-      if (eStream.shouldForceSync()) {
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+
+      if (jas.getCurrentStream().shouldForceSync()) {
         return true;
       }
     }
     return false;
   }
   
-  private void recordTransaction(long start) {
+  private long beginTransaction() {
+    assert Thread.holdsLock(this);
     // get a new transactionId
     txid++;
 
@@ -401,7 +292,12 @@ public class FSEditLog implements NNStor
     //
     TransactionId id = myTransactionId.get();
     id.txid = txid;
-
+    return now();
+  }
+  
+  private void endTransaction(long start) {
+    assert Thread.holdsLock(this);
+    
     // update statistics
     long end = now();
     numTransactions++;
@@ -411,6 +307,35 @@ public class FSEditLog implements NNStor
   }
 
   /**
+   * Return the transaction ID of the last transaction written to the log.
+   */
+  synchronized long getLastWrittenTxId() {
+    return txid;
+  }
+  
+  /**
+   * @return the first transaction ID in the current log segment
+   */
+  synchronized long getCurSegmentTxId() {
+    Preconditions.checkState(state == State.IN_SEGMENT,
+        "Bad state: %s", state);
+    return curSegmentTxId;
+  }
+  
+  /**
+   * Set the transaction ID to use for the next transaction written.
+   */
+  synchronized void setNextTxId(long nextTxId) {
+    Preconditions.checkArgument(synctxid <= txid &&
+       nextTxId >= txid,
+       "May not decrease txid." +
+      " synctxid=%s txid=%s nextTxId=%s",
+      synctxid, txid, nextTxId);
+      
+    txid = nextTxId - 1;
+  }
+    
+  /**
    * Blocks until all ongoing edits have been synced to disk.
    * This differs from logSync in that it waits for edits that have been
    * written by other threads, not just edits from the calling thread.
@@ -457,12 +382,15 @@ public class FSEditLog implements NNStor
    * waitForSyncToFinish() before assuming they are running alone.
    */
   public void logSync() {
-    ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
 
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
-    ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
+    
+    List<JournalAndStream> candidateJournals =
+      Lists.newArrayListWithCapacity(journals.size());
+    List<JournalAndStream> badJournals = Lists.newArrayList();
+    
     boolean sync = false;
     try {
       synchronized (this) {
@@ -493,20 +421,16 @@ public class FSEditLog implements NNStor
         sync = true;
   
         // swap buffers
-        assert editStreams.size() > 0 : "no editlog streams";
-        for(EditLogOutputStream eStream : editStreams) {
+        assert !journals.isEmpty() : "no editlog streams";
+        
+        for (JournalAndStream jas : journals) {
+          if (!jas.isActive()) continue;
           try {
-            eStream.setReadyToFlush();
-            streams.add(eStream);
+            jas.getCurrentStream().setReadyToFlush();
+            candidateJournals.add(jas);
           } catch (IOException ie) {
             LOG.error("Unable to get ready to flush.", ie);
-            //
-            // remember the streams that encountered an error.
-            //
-            if (errorStreams == null) {
-              errorStreams = new ArrayList<EditLogOutputStream>(1);
-            }
-            errorStreams.add(eStream);
+            badJournals.add(jas);
           }
         }
         } finally {
@@ -517,29 +441,36 @@ public class FSEditLog implements NNStor
   
       // do the sync
       long start = now();
-      for (EditLogOutputStream eStream : streams) {
+      for (JournalAndStream jas : candidateJournals) {
+        if (!jas.isActive()) continue;
         try {
-          eStream.flush();
+          jas.getCurrentStream().flush();
         } catch (IOException ie) {
           LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
-          if (errorStreams == null) {
-            errorStreams = new ArrayList<EditLogOutputStream>(1);
-          }
-          errorStreams.add(eStream);
+          badJournals.add(jas);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnStreams(errorStreams);
+      disableAndReportErrorOnJournals(badJournals);
   
-      if (metrics != null) // Metrics non-null only when used inside name node
+      if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
+      }
+      
     } finally {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
         if (sync) {
+          if (badJournals.size() >= journals.size()) {
+            LOG.fatal("Could not sync any journal to persistent storage. " +
+                "Unsynced transactions: " + (txid - synctxid),
+                new Exception());
+            runtime.exit(1);
+          }
+
           synctxid = syncStart;
           isSyncRunning = false;
         }
@@ -556,7 +487,7 @@ public class FSEditLog implements NNStor
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (editStreams == null || editStreams.size()==0) {
+    if (journals.isEmpty()) {
       return;
     }
     lastPrintTime = now;
@@ -568,12 +499,17 @@ public class FSEditLog implements NNStor
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    buf.append(editStreams.get(0).getNumSync());
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      buf.append(jas.getCurrentStream().getNumSync());
+      break;
+    }
+
     buf.append(" SyncTimes(ms): ");
 
-    int numEditStreams = editStreams.size();
-    for (int idx = 0; idx < numEditStreams; idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      EditLogOutputStream eStream = jas.getCurrentStream();
       buf.append(eStream.getTotalSyncTime());
       buf.append(" ");
     }
@@ -788,202 +724,191 @@ public class FSEditLog implements NNStor
   }
   
   /**
-   * Return the size of the current EditLog
+   * @return the number of active (non-failed) journals
    */
-  synchronized long getEditLogSize() throws IOException {
-    assert getNumEditsDirs() <= getNumEditStreams() : 
-        "Number of edits directories should not exceed the number of streams.";
-    long size = 0;
-    ArrayList<EditLogOutputStream> al = null;
-    for (int idx = 0; idx < getNumEditStreams(); idx++) {
-      EditLogOutputStream es = editStreams.get(idx);
-      try {
-        long curSize = es.length();
-        assert (size == 0 || size == curSize || curSize ==0) :
-          "Wrong streams size";
-        size = Math.max(size, curSize);
-      } catch (IOException e) {
-        LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
-            idx + ") " + es.getName());
-        if(al==null) al = new ArrayList<EditLogOutputStream>(1);
-        al.add(es);
+  private int countActiveJournals() {
+    int count = 0;
+    for (JournalAndStream jas : journals) {
+      if (jas.isActive()) {
+        count++;
       }
     }
-    if(al!=null) disableAndReportErrorOnStreams(al);
-    return size;
+    return count;
   }
   
   /**
-   * Closes the current edit log and opens edits.new. 
+   * Used only by unit tests.
    */
-  synchronized void rollEditLog() throws IOException {
-    waitForSyncToFinish();
-    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
-    if(!it.hasNext()) 
-      return;
-    //
-    // If edits.new already exists in some directory, verify it
-    // exists in all directories.
-    //
-    boolean alreadyExists = existsNew(it.next());
-    while(it.hasNext()) {
-      StorageDirectory sd = it.next();
-      if(alreadyExists != existsNew(sd))
-        throw new IOException(getEditNewFile(sd) 
-              + "should " + (alreadyExists ? "" : "not ") + "exist.");
-    }
-    if(alreadyExists)
-      return; // nothing to do, edits.new exists!
-
-    // check if any of failed storage is now available and put it back
-    storage.attemptRestoreRemovedStorage();
-
-    divertFileStreams(
-        Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
+  @VisibleForTesting
+  List<JournalAndStream> getJournals() {
+    return journals;
   }
-
+  
   /**
-   * Divert file streams from file edits to file edits.new.<p>
-   * Close file streams, which are currently writing into edits files.
-   * Create new streams based on file getRoot()/dest.
-   * @param dest new stream path relative to the storage directory root.
-   * @throws IOException
+   * Used only by unit tests.
+   */
+  @VisibleForTesting
+  synchronized void setRuntimeForTesting(Runtime runtime) {
+    this.runtime = runtime;
+  }
+  
+  /**
+   * Return a manifest of what finalized edit logs are available
    */
-  synchronized void divertFileStreams(String dest) throws IOException {
-    waitForSyncToFinish();
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+      throws IOException {
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
 
-    assert getNumEditStreams() >= getNumEditsDirs() :
-      "Inconsistent number of streams";
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    EditStreamIterator itE = 
-      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
-    Iterator<StorageDirectory> itD = 
-      storage.dirIterator(NameNodeDirType.EDITS);
-    while(itE.hasNext() && itD.hasNext()) {
-      EditLogOutputStream eStream = itE.next();
-      StorageDirectory sd = itD.next();
-      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream);
-      try {
-        // close old stream
-        closeStream(eStream);
-        // create new stream
-        eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
-            sizeOutputFlushBuffer);
-        eStream.create();
-        // replace by the new stream
-        itE.replace(eStream);
-      } catch (IOException e) {
-        LOG.warn("Error in editStream " + eStream.getName(), e);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+      inspector.inspectDirectory(sd);
     }
-    disableAndReportErrorOnStreams(errorStreams);
+    
+    return inspector.getEditLogManifest(sinceTxId);
   }
-
+  
   /**
-   * Removes the old edit log and renames edits.new to edits.
-   * Reopens the edits file.
+   * Finalizes the current edit log and opens a new log segment.
+   * @return the transaction id of the BEGIN_LOG_SEGMENT transaction
+   * in the new log.
    */
-  synchronized void purgeEditLog() throws IOException {
-    waitForSyncToFinish();
-    revertFileStreams(
-        Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
+  synchronized long rollEditLog() throws IOException {
+    LOG.info("Rolling edit logs.");
+    endCurrentLogSegment(true);
+    
+    long nextTxId = getLastWrittenTxId() + 1;
+    startLogSegment(nextTxId, true);
+    
+    assert curSegmentTxId == nextTxId;
+    return nextTxId;
   }
-
-
+  
   /**
-   * The actual sync activity happens while not synchronized on this object.
-   * Thus, synchronized activities that require that they are not concurrent
-   * with file operations should wait for any running sync to finish.
+   * Start writing to the log segment with the given txid.
+   * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. 
    */
-  synchronized void waitForSyncToFinish() {
-    while (isSyncRunning) {
-      try {
-        wait(1000);
-      } catch (InterruptedException ie) {}
+  synchronized void startLogSegment(final long segmentTxId,
+      boolean writeHeaderTxn) throws IOException {
+    LOG.info("Starting log segment at " + segmentTxId);
+    Preconditions.checkArgument(segmentTxId > 0,
+        "Bad txid: %s", segmentTxId);
+    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+        "Bad state: %s", state);
+    Preconditions.checkState(segmentTxId > curSegmentTxId,
+        "Cannot start writing to log segment " + segmentTxId +
+        " when previous log segment started at " + curSegmentTxId);
+    Preconditions.checkArgument(segmentTxId == txid + 1,
+        "Cannot start log segment at txid %s when next expected " +
+        "txid is %s", segmentTxId, txid + 1);
+    
+    numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
+
+    // TODO no need to link this back to storage anymore!
+    // See HDFS-2174.
+    storage.attemptRestoreRemovedStorage();
+    
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.startLogSegment(segmentTxId);
+      }
+    }, "starting log segment " + segmentTxId);
+
+    if (countActiveJournals() == 0) {
+      throw new IOException("Unable to start log segment " +
+          segmentTxId + ": no journals successfully started.");
+    }
+    
+    curSegmentTxId = segmentTxId;
+    state = State.IN_SEGMENT;
+
+    if (writeHeaderTxn) {
+      logEdit(LogSegmentOp.getInstance(
+          FSEditLogOpCodes.OP_START_LOG_SEGMENT));
+      logSync();
     }
   }
 
   /**
-   * Revert file streams from file edits.new back to file edits.<p>
-   * Close file streams, which are currently writing into getRoot()/source.
-   * Rename getRoot()/source to edits.
-   * Reopen streams so that they start writing into edits files.
-   * @param dest new stream path relative to the storage directory root.
-   * @throws IOException
+   * Finalize the current log segment.
+   * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
    */
-  synchronized void revertFileStreams(String source) throws IOException {
-    waitForSyncToFinish();
+  synchronized void endCurrentLogSegment(boolean writeEndTxn) {
+    LOG.info("Ending log segment " + curSegmentTxId);
+    Preconditions.checkState(state == State.IN_SEGMENT,
+        "Bad state: %s", state);
+    
+    if (writeEndTxn) {
+      logEdit(LogSegmentOp.getInstance(
+          FSEditLogOpCodes.OP_END_LOG_SEGMENT));
+      logSync();
+    }
 
-    assert getNumEditStreams() >= getNumEditsDirs() :
-      "Inconsistent number of streams";
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    EditStreamIterator itE = 
-      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
-    Iterator<StorageDirectory> itD = 
-      storage.dirIterator(NameNodeDirType.EDITS);
-    while(itE.hasNext() && itD.hasNext()) {
-      EditLogOutputStream eStream = itE.next();
-      StorageDirectory sd = itD.next();
-      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream +
-                              " does not start with " + sd.getRoot().getPath());
-      try {
-        // close old stream
-        closeStream(eStream);
-        // rename edits.new to edits
-        File editFile = getEditFile(sd);
-        File prevEditFile = new File(sd.getRoot(), source);
-        if(prevEditFile.exists()) {
-          if(!prevEditFile.renameTo(editFile)) {
-            //
-            // renameTo() fails on Windows if the destination
-            // file exists.
-            //
-            if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
-              throw new IOException("Rename failed for " + sd.getRoot());
-            }
-          }
+    printStatistics(true);
+    
+    final long lastTxId = getLastWrittenTxId();
+    
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        if (jas.isActive()) {
+          jas.close(lastTxId);
         }
-        // open new stream
-        eStream = new EditLogFileOutputStream(editFile, sizeOutputFlushBuffer);
-        // replace by the new stream
-        itE.replace(eStream);
-      } catch (IOException e) {
-        LOG.warn("Error in editStream " + eStream.getName(), e);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
       }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
+    }, "ending log segment");
+    
+    state = State.BETWEEN_LOG_SEGMENTS;
+  }
+  
+  /**
+   * Abort all current logs. Called from the backup node.
+   */
+  synchronized void abortCurrentLogSegment() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.abort();
+      }
+    }, "aborting all streams");
+    state = State.BETWEEN_LOG_SEGMENTS;
   }
 
   /**
-   * Return the name of the edit file
+   * Archive any log files that are older than the given txid.
    */
-  synchronized File getFsEditName() {
-    StorageDirectory sd = null;   
-    for (Iterator<StorageDirectory> it = 
-      storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      sd = it.next();   
-      if(sd.getRoot().canRead())
-        return getEditFile(sd);
+  public void purgeLogsOlderThan(
+      final long minTxIdToKeep, final StoragePurger purger) {
+    synchronized (this) {
+      // synchronized to prevent findbugs warning about inconsistent
+      // synchronization. This will be JIT-ed out if asserts are
+      // off.
+      assert curSegmentTxId == FSConstants.INVALID_TXID || // on format this is no-op
+        minTxIdToKeep <= curSegmentTxId :
+        "cannot purge logs older than txid " + minTxIdToKeep +
+        " when current segment starts at " + curSegmentTxId;
     }
-    return null;
+    
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.manager.purgeLogsOlderThan(minTxIdToKeep, purger);
+      }
+    }, "purging logs older than " + minTxIdToKeep);
   }
 
+  
   /**
-   * Returns the timestamp of the edit log
+   * The actual sync activity happens while not synchronized on this object.
+   * Thus, synchronized activities that require that they are not concurrent
+   * with file operations should wait for any running sync to finish.
    */
-  synchronized long getFsEditTime() {
-    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
-    if(it.hasNext())
-      return getEditFile(it.next()).lastModified();
-    return 0;
+  synchronized void waitForSyncToFinish() {
+    while (isSyncRunning) {
+      try {
+        wait(1000);
+      } catch (InterruptedException ie) {}
+    }
   }
 
   /**
@@ -996,273 +921,224 @@ public class FSEditLog implements NNStor
 
 
   // sets the initial capacity of the flush buffer.
-  public void setBufferCapacity(int size) {
-    sizeOutputFlushBuffer = size;
-  }
-
-
-  boolean isEmpty() throws IOException {
-    return getEditLogSize() <= 0;
+  public void setOutputBufferCapacity(int size) {
+    for (JournalAndStream jas : journals) {
+      jas.manager.setOutputBufferCapacity(size);
+    }
   }
 
   /**
    * Create (or find if already exists) an edit output stream, which
    * streams journal records (edits) to the specified backup node.<br>
-   * Send a record, prescribing to start journal spool.<br>
-   * This should be sent via regular stream of journal records so that
-   * the backup node new exactly after which record it should start spooling.
+   * 
+   * The new BackupNode will start receiving edits the next time this
+   * NameNode's logs roll.
    * 
    * @param bnReg the backup node registration information.
    * @param nnReg this (active) name-node registration.
    * @throws IOException
    */
-  synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
-                      NamenodeRegistration nnReg) // active name-node
+  synchronized void registerBackupNode(
+      NamenodeRegistration bnReg, // backup node
+      NamenodeRegistration nnReg) // active name-node
   throws IOException {
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
-    if(editStreams == null)
-      editStreams = new ArrayList<EditLogOutputStream>();
-    EditLogOutputStream boStream = null;
-    for(EditLogOutputStream eStream : editStreams) {
-      if(eStream.getName().equals(bnReg.getAddress())) {
-        boStream = eStream; // already there
-        break;
-      }
-    }
-    if(boStream == null) {
-      boStream = new EditLogBackupOutputStream(bnReg, nnReg);
-      editStreams.add(boStream);
+    
+    JournalAndStream jas = findBackupJournalAndStream(bnReg);
+    if (jas != null) {
+      // already registered
+      LOG.info("Backup node " + bnReg + " re-registers");
+      return;
     }
-    logEdit(JSpoolStartOp.getInstance());
+    
+    LOG.info("Registering new backup node: " + bnReg);
+    BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
+    journals.add(new JournalAndStream(bjm));
   }
-
-  /**
-   * Write an operation to the edit log. Do not sync to persistent
-   * store yet.
-   */
-  synchronized void logEdit(int length, byte[] data) {
-    if(getNumEditStreams() == 0)
-      throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    long start = now();
-    for(EditLogOutputStream eStream : editStreams) {
-      try {
-        eStream.writeRaw(data, 0, length);
-      } catch (IOException ie) {
-        LOG.warn("Error in editStream " + eStream.getName(), ie);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
+  
+  synchronized void releaseBackupStream(NamenodeRegistration registration) {
+    for (Iterator<JournalAndStream> iter = journals.iterator();
+         iter.hasNext();) {
+      JournalAndStream jas = iter.next();
+      if (jas.manager instanceof BackupJournalManager &&
+          ((BackupJournalManager)jas.manager).matchesRegistration(
+              registration)) {
+        jas.abort();        
+        LOG.info("Removing backup journal " + jas);
+        iter.remove();
       }
     }
-    disableAndReportErrorOnStreams(errorStreams);
-    recordTransaction(start);
   }
-
+  
   /**
-   * Iterates output streams based of the same type.
-   * Type null will iterate over all streams.
-   */
-  private class EditStreamIterator implements Iterator<EditLogOutputStream> {
-    JournalType type;
-    int prevIndex; // for remove()
-    int nextIndex; // for next()
-
-    EditStreamIterator(JournalType streamType) {
-      this.type = streamType;
-      this.nextIndex = 0;
-      this.prevIndex = 0;
-    }
-
-    public boolean hasNext() {
-      synchronized(FSEditLog.this) {
-        if(editStreams == null || 
-           editStreams.isEmpty() || nextIndex >= editStreams.size())
-          return false;
-        while(nextIndex < editStreams.size()
-              && !editStreams.get(nextIndex).getType().isOfType(type))
-          nextIndex++;
-        return nextIndex < editStreams.size();
-      }
-    }
-
-    public EditLogOutputStream next() {
-      EditLogOutputStream stream = null;
-      synchronized(FSEditLog.this) {
-        stream = editStreams.get(nextIndex);
-        prevIndex = nextIndex;
-        nextIndex++;
-        while(nextIndex < editStreams.size()
-            && !editStreams.get(nextIndex).getType().isOfType(type))
-        nextIndex++;
-      }
-      return stream;
-    }
-
-    public void remove() {
-      nextIndex = prevIndex; // restore previous state
-      removeStream(prevIndex); // remove last returned element
-      hasNext(); // reset nextIndex to correct place
-    }
-
-    void replace(EditLogOutputStream newStream) {
-      synchronized (FSEditLog.this) {
-        assert 0 <= prevIndex && prevIndex < editStreams.size() :
-                                                          "Index out of bound.";
-        editStreams.set(prevIndex, newStream);
+   * Find the JournalAndStream associated with this BackupNode.
+   * @return null if it cannot be found
+   */
+  private synchronized JournalAndStream findBackupJournalAndStream(
+      NamenodeRegistration bnReg) {
+    for (JournalAndStream jas : journals) {
+      if (jas.manager instanceof BackupJournalManager) {
+        BackupJournalManager bjm = (BackupJournalManager)jas.manager;
+        if (bjm.matchesRegistration(bnReg)) {
+          return jas;
+        }
       }
     }
+    return null;
   }
 
   /**
-   * Get stream iterator for the specified type.
-   */
-  public Iterator<EditLogOutputStream>
-  getOutputStreamIterator(JournalType streamType) {
-    return new EditStreamIterator(streamType);
-  }
+   * Write an operation to the edit log. Do not sync to persistent
+   * store yet.
+   */   
+  synchronized void logEdit(final int length, final byte[] data) {
+    long start = beginTransaction();
+    
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        if (jas.isActive()) {
+          jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
+        }
+      }      
+    }, "Logging edit");
 
-  private void closeStream(EditLogOutputStream eStream) throws IOException {
-    eStream.setReadyToFlush();
-    eStream.flush();
-    eStream.close();
+    endTransaction(start);
   }
 
-  void incrementCheckpointTime() {
-    storage.incrementCheckpointTime();
-    CheckpointTimeOp op = CheckpointTimeOp.getInstance()
-      .setCheckpointTime(storage.getCheckpointTime());
-    logEdit(op); 
+  //// Iteration across journals
+  private interface JournalClosure {
+    public void apply(JournalAndStream jas) throws IOException;
   }
 
-  synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    Iterator<EditLogOutputStream> it =
-                                  getOutputStreamIterator(JournalType.BACKUP);
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    NamenodeRegistration backupNode = null;
-    while(it.hasNext()) {
-      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
-      backupNode = eStream.getRegistration();
-      if(backupNode.getAddress().equals(registration.getAddress()) &&
-            backupNode.isRole(registration.getRole())) {
-        errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-        break;
-      }
-    }
-    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
-      "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnStreams(errorStreams);
-  }
-
-  synchronized boolean checkBackupRegistration(
-      NamenodeRegistration registration) {
-    Iterator<EditLogOutputStream> it =
-                                  getOutputStreamIterator(JournalType.BACKUP);
-    boolean regAllowed = !it.hasNext();
-    NamenodeRegistration backupNode = null;
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    while(it.hasNext()) {
-      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
-      backupNode = eStream.getRegistration();
-      if(backupNode.getAddress().equals(registration.getAddress()) &&
-          backupNode.isRole(registration.getRole())) {
-        regAllowed = true; // same node re-registers
-        break;
-      }
-      if(!eStream.isAlive()) {
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-        regAllowed = true; // previous backup node failed
+  /**
+   * Apply the given function across all of the journal managers, disabling
+   * any for which the closure throws an IOException.
+   * @param status message used for logging errors (e.g. "opening journal")
+   */
+  private void mapJournalsAndReportErrors(
+      JournalClosure closure, String status) {
+    List<JournalAndStream> badJAS = Lists.newLinkedList();
+    for (JournalAndStream jas : journals) {
+      try {
+        closure.apply(jas);
+      } catch (Throwable t) {
+        LOG.error("Error " + status + " (journal " + jas + ")", t);
+        badJAS.add(jas);
       }
     }
-    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
-      "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnStreams(errorStreams);
-    return regAllowed;
+
+    disableAndReportErrorOnJournals(badJAS);
   }
   
-
   /**
-   * Get the StorageDirectory for a stream
-   * @param es Stream whose StorageDirectory we wish to know
-   * @return the matching StorageDirectory
+   * Called when some journals experience an error in some operation.
+   * This propagates errors to the storage level.
    */
-  StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
-    String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
-
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      FSNamesystem.LOG.info("comparing: " + parentStorageDir 
-                            + " and " + sd.getRoot().getAbsolutePath()); 
-      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
-        return sd;
-    }
-    return null;
-  }
-
-  private synchronized void disableStream(EditLogOutputStream stream) {
-    try { stream.close(); } catch (IOException e) {
-      // nothing to do.
-      LOG.warn("Failed to close eStream " + stream.getName()
-               + " before removing it (might be ok)");
+  private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
+    if (badJournals == null || badJournals.isEmpty()) {
+      return; // nothing to do
     }
-    editStreams.remove(stream);
-
-    if (editStreams.size() <= 0) {
-      String msg = "Fatal Error: All storage directories are inaccessible.";
-      LOG.fatal(msg, new IOException(msg));
-      Runtime.getRuntime().exit(-1);
+ 
+    for (JournalAndStream j : badJournals) {
+      LOG.error("Disabling journal " + j);
+      j.abort();
     }
   }
 
   /**
-   * Error Handling on a storageDirectory
-   *
+   * Container for a JournalManager paired with its currently
+   * active stream.
+   * 
+   * If a Journal gets disabled due to an error writing to its
+   * stream, then the stream will be aborted and set to null.
    */
-  // NNStorageListener Interface
-  @Override // NNStorageListener
-  public synchronized void errorOccurred(StorageDirectory sd)
-      throws IOException {
-    if (editStreams == null) {
-      //errors can occur on storage directories 
-      //before edit streams have been set up
-      return;
+  static class JournalAndStream {
+    private final JournalManager manager;
+    private EditLogOutputStream stream;
+    private long segmentStartsAtTxId = FSConstants.INVALID_TXID;
+    
+    private JournalAndStream(JournalManager manager) {
+      this.manager = manager;
     }
-    ArrayList<EditLogOutputStream> errorStreams
-      = new ArrayList<EditLogOutputStream>();
 
-    for (EditLogOutputStream eStream : editStreams) {
-      LOG.error("Unable to log edits to " + eStream.getName()
-                + "; removing it");
-
-      StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
-      if (sd == streamStorageDir) {
-        errorStreams.add(eStream);
+    private void startLogSegment(long txId) throws IOException {
+      Preconditions.checkState(stream == null);
+      stream = manager.startLogSegment(txId);
+      segmentStartsAtTxId = txId;
+    }
+
+    private void close(long lastTxId) throws IOException {
+      Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
+          "invalid segment: lastTxId %s >= " +
+          "segment starting txid %s", lastTxId, segmentStartsAtTxId);
+          
+      if (stream == null) return;
+      stream.close();
+      manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
+      stream = null;
+    }
+    
+    private void abort() {
+      if (stream == null) return;
+      try {
+        stream.abort();
+      } catch (IOException ioe) {
+        LOG.error("Unable to abort stream " + stream, ioe);
       }
+      stream = null;
+      segmentStartsAtTxId = FSConstants.INVALID_TXID;
     }
 
-    for (EditLogOutputStream eStream : errorStreams) {
-      disableStream(eStream);
+    private boolean isActive() {
+      return stream != null;
     }
-  }
 
-  @Override // NNStorageListener
-  public synchronized void formatOccurred(StorageDirectory sd)
-      throws IOException {
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      createEditLogFile(NNStorage.getStorageFile(sd, NameNodeFile.EDITS));
+    @VisibleForTesting
+    EditLogOutputStream getCurrentStream() {
+      return stream;
     }
-  };
+    
+    @Override
+    public String toString() {
+      return "JournalAndStream(mgr=" + manager +
+        ", " + "stream=" + stream + ")";
+    }
+
+    @VisibleForTesting
+    void setCurrentStreamForTests(EditLogOutputStream stream) {
+      this.stream = stream;
+    }
+    
+    @VisibleForTesting
+    JournalManager getManager() {
+      return manager;
+    }
+
+    private EditLogInputStream getInProgressInputStream() throws IOException {
+      return manager.getInProgressInputStream(segmentStartsAtTxId);
+    }
+  }
 
-  @Override // NNStorageListener
-  public synchronized void directoryAvailable(StorageDirectory sd)
+  /**
+   * @return an EditLogInputStream that reads from the same log that
+   * the edit log is currently writing. This is used from the BackupNode
+   * during edits synchronization.
+   * @throws IOException if no valid logs are available.
+   */
+  synchronized EditLogInputStream getInProgressFileInputStream()
       throws IOException {
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      File eFile = getEditFile(sd);
-      addNewEditLogStream(eFile);
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      try {
+        EditLogInputStream in = jas.getInProgressInputStream();
+        if (in != null) return in;
+      } catch (IOException ioe) {
+        LOG.warn("Unable to get the in-progress input stream from " + jas,
+            ioe);
+      }
     }
+    throw new IOException("No in-progress stream provided edits");
   }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jul 29 16:28:45 2011
@@ -21,12 +21,12 @@ import static org.apache.hadoop.hdfs.ser
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
-import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
-import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Reader;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
@@ -71,75 +73,42 @@ public class FSEditLogLoader {
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  int loadFSEdits(EditLogInputStream edits) throws IOException {
+  int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
+  throws IOException {
     long startTime = now();
-    int numEdits = loadFSEdits(edits, true);
+    int numEdits = loadFSEdits(edits, true, expectedStartingTxId);
     FSImage.LOG.info("Edits file " + edits.getName() 
         + " of size " + edits.length() + " edits # " + numEdits 
         + " loaded in " + (now()-startTime)/1000 + " seconds.");
     return numEdits;
   }
 
-  /**
-   * Read the header of fsedit log
-   * @param in fsedit stream
-   * @return the edit log version number
-   * @throws IOException if error occurs
-   */
-  int readLogVersion(DataInputStream in) throws IOException {
-    int logVersion = 0;
-    // Read log file version. Could be missing.
-    in.mark(4);
-    // If edits log is greater than 2G, available method will return negative
-    // numbers, so we avoid having to call available
-    boolean available = true;
-    try {
-      logVersion = in.readByte();
-    } catch (EOFException e) {
-      available = false;
-    }
-    if (available) {
-      in.reset();
-      logVersion = in.readInt();
-      if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-        throw new IOException(
-            "Unexpected version of the file system log file: "
-            + logVersion + ". Current version = "
-            + FSConstants.LAYOUT_VERSION + ".");
-    }
-    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-      "Unsupported version " + logVersion;
-    return logVersion;
-  }
-  
-  int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException {
+  int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
+      long expectedStartingTxId)
+  throws IOException {
     BufferedInputStream bin = new BufferedInputStream(edits);
     DataInputStream in = new DataInputStream(bin);
 
     int numEdits = 0;
-    int logVersion = 0;
 
     try {
-      logVersion = readLogVersion(in);
-      Checksum checksum = null;
-      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
-        checksum = FSEditLog.getChecksum();
-        in = new DataInputStream(new CheckedInputStream(bin, checksum));
-      }
-
-      numEdits = loadEditRecords(logVersion, in, checksum, false);
+      LogHeader header = LogHeader.read(in);
+      numEdits = loadEditRecords(
+          header.logVersion, in, header.checksum, false,
+          expectedStartingTxId);
     } finally {
       if(closeOnExit)
         in.close();
     }
-    if (logVersion != FSConstants.LAYOUT_VERSION) // other version
-      numEdits++; // save this image asap
+    
     return numEdits;
   }
 
   @SuppressWarnings("deprecation")
   int loadEditRecords(int logVersion, DataInputStream in,
-      Checksum checksum, boolean closeOnExit) throws IOException {
+                      Checksum checksum, boolean closeOnExit,
+                      long expectedStartingTxId)
+      throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
 
@@ -162,6 +131,8 @@ public class FSEditLogLoader {
     Arrays.fill(recentOpcodeOffsets, -1);
 
     try {
+      long txId = expectedStartingTxId - 1;
+
       try {
         FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
                                                            checksum);
@@ -169,6 +140,15 @@ public class FSEditLogLoader {
         while ((op = reader.readOp()) != null) {
           recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
               tracker.getPos();
+          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+            long thisTxId = op.txid;
+            if (thisTxId != txId + 1) {
+              throw new IOException("Expected transaction ID " +
+                  (txId + 1) + " but got " + thisTxId);
+            }
+            txId = thisTxId;
+          }
+
           numEdits++;
           switch (op.opCode) {
           case OP_ADD:
@@ -417,6 +397,12 @@ public class FSEditLogLoader {
                 reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
             break;
           }
+          case OP_START_LOG_SEGMENT:
+          case OP_END_LOG_SEGMENT: {
+            // no data in here currently.
+            numOpOther++;
+            break;
+          }
           case OP_DATANODE_ADD:
           case OP_DATANODE_REMOVE:
             numOpOther++;
@@ -495,6 +481,61 @@ public class FSEditLogLoader {
   }
   
   /**
+   * Return the number of valid transactions in the file. If the file is
+   * truncated during the header, returns a value indicating that there are
+   * 0 valid transactions.
+   * @throws IOException if the file cannot be read due to an IO error (eg
+   *                     if the log does not exist)
+   */
+  static EditLogValidation validateEditLog(File f) throws IOException {
+    FileInputStream fis = new FileInputStream(f);
+    try {
+      PositionTrackingInputStream tracker = new PositionTrackingInputStream(
+          new BufferedInputStream(fis));
+      DataInputStream dis = new DataInputStream(tracker);
+      LogHeader header; 
+      try {
+        header = LogHeader.read(dis);
+      } catch (Throwable t) {
+        FSImage.LOG.debug("Unable to read header from " + f +
+            " -> no valid transactions in this file.");
+        return new EditLogValidation(0, 0);
+      }
+      
+      Reader reader = new FSEditLogOp.Reader(dis, header.logVersion, header.checksum);
+      long numValid = 0;
+      long lastPos = 0;
+      try {
+        while (true) {
+          lastPos = tracker.getPos();
+          if (reader.readOp() == null) {
+            break;
+          }
+          numValid++;
+        }
+      } catch (Throwable t) {
+        // Catch Throwable and not just IOE, since bad edits may generate
+        // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
+        FSImage.LOG.debug("Caught exception after reading " + numValid +
+            " ops from " + f + " while determining its valid length.", t);
+      }
+      return new EditLogValidation(lastPos, numValid);
+    } finally {
+      fis.close();
+    }
+  }
+  
+  static class EditLogValidation {
+    long validLength;
+    long numTransactions;
+    
+    EditLogValidation(long validLength, long numTransactions) {
+      this.validLength = validLength;
+      this.numTransactions = numTransactions;
+    }
+  }
+
+  /**
    * Stream wrapper that keeps track of the current file position.
    */
   private static class PositionTrackingInputStream extends FilterInputStream {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Jul 29 16:28:45 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 import java.util.EnumMap;
 
@@ -29,10 +30,12 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.Storage;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -61,6 +64,8 @@ import java.io.EOFException;
 @InterfaceStability.Unstable
 public abstract class FSEditLogOp {
   final FSEditLogOpCodes opCode;
+  long txid;
+
 
   @SuppressWarnings("deprecation")
   private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
@@ -93,8 +98,10 @@ public abstract class FSEditLogOp {
         instances.put(OP_CANCEL_DELEGATION_TOKEN, 
                       new CancelDelegationTokenOp());
         instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
-        instances.put(OP_CHECKPOINT_TIME, new CheckpointTimeOp());
-        instances.put(OP_JSPOOL_START, new JSpoolStartOp());
+        instances.put(OP_START_LOG_SEGMENT,
+                      new LogSegmentOp(OP_START_LOG_SEGMENT));
+        instances.put(OP_END_LOG_SEGMENT,
+                      new LogSegmentOp(OP_END_LOG_SEGMENT));
         return instances;
       }
   };
@@ -105,6 +112,11 @@ public abstract class FSEditLogOp {
    */
   private FSEditLogOp(FSEditLogOpCodes opCode) {
     this.opCode = opCode;
+    this.txid = 0;
+  }
+
+  public void setTransactionId(long txid) {
+    this.txid = txid;
   }
 
   abstract void readFields(DataInputStream in, int logVersion)
@@ -1207,72 +1219,46 @@ public abstract class FSEditLogOp {
       this.key.readFields(in);
     }
   }
-
-  static class InvalidOp extends FSEditLogOp {
-    private InvalidOp() {
-      super(OP_INVALID);
+  
+  static class LogSegmentOp extends FSEditLogOp {
+    private LogSegmentOp(FSEditLogOpCodes code) {
+      super(code);
+      assert code == OP_START_LOG_SEGMENT ||
+             code == OP_END_LOG_SEGMENT : "Bad op: " + code;
     }
 
-    static InvalidOp getInstance() {
-      return (InvalidOp)opInstances.get().get(OP_INVALID);
+    static LogSegmentOp getInstance(FSEditLogOpCodes code) {
+      return (LogSegmentOp)opInstances.get().get(code);
     }
 
-    @Override 
-    void writeFields(DataOutputStream out) throws IOException {
-    }
-    
-    @Override
-    void readFields(DataInputStream in, int logVersion)
+    public void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      // nothing to read
-    }
-  }
-
-  static class JSpoolStartOp extends FSEditLogOp {
-    private JSpoolStartOp() {
-      super(OP_JSPOOL_START);
-    }
-
-    static JSpoolStartOp getInstance() {
-      return (JSpoolStartOp)opInstances.get().get(OP_JSPOOL_START);
+      // no data stored in these ops yet
     }
 
-    @Override 
-    void writeFields(DataOutputStream out) throws IOException {
-    }
-    
     @Override
-    void readFields(DataInputStream in, int logVersion)
-        throws IOException {
+    void writeFields(DataOutputStream out) throws IOException {
+      // no data stored
     }
   }
 
-  static class CheckpointTimeOp extends FSEditLogOp {
-    long checkpointTime;
-
-    private CheckpointTimeOp() {
-      super(OP_CHECKPOINT_TIME);            
-    }
-    
-    CheckpointTimeOp setCheckpointTime(long time) {
-      this.checkpointTime = time;
-      return this;
+  static class InvalidOp extends FSEditLogOp {
+    private InvalidOp() {
+      super(OP_INVALID);
     }
 
-    static CheckpointTimeOp getInstance() {
-      return (CheckpointTimeOp)opInstances.get()
-        .get(OP_CHECKPOINT_TIME);
+    static InvalidOp getInstance() {
+      return (InvalidOp)opInstances.get().get(OP_INVALID);
     }
 
     @Override 
     void writeFields(DataOutputStream out) throws IOException {
-      new LongWritable(checkpointTime).write(out);
     }
     
     @Override
     void readFields(DataInputStream in, int logVersion)
         throws IOException {
-      this.checkpointTime = readLong(in);
+      // nothing to read
     }
   }
 
@@ -1337,6 +1323,62 @@ public abstract class FSEditLogOp {
       return longWritable.get();
     }
   }
+  
+  /**
+   * Class to encapsulate the header at the top of a log file.
+   */
+  static class LogHeader {
+    final int logVersion;
+    final Checksum checksum;
+
+    public LogHeader(int logVersion, Checksum checksum) {
+      this.logVersion = logVersion;
+      this.checksum = checksum;
+    }
+
+    static LogHeader read(DataInputStream in) throws IOException {
+      int logVersion = 0;
+
+      logVersion = FSEditLogOp.LogHeader.readLogVersion(in);
+      Checksum checksum = null;
+      if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
+        checksum = FSEditLog.getChecksum();
+      }
+      return new LogHeader(logVersion, checksum);
+    }
+    
+    /**
+     * Read the header of fsedit log
+     * @param in fsedit stream
+     * @return the edit log version number
+     * @throws IOException if error occurs
+     */
+    private static int readLogVersion(DataInputStream in) throws IOException {
+      int logVersion = 0;
+      // Read log file version. Could be missing.
+      in.mark(4);
+      // If edits log is greater than 2G, available method will return negative
+      // numbers, so we avoid having to call available
+      boolean available = true;
+      try {
+        logVersion = in.readByte();
+      } catch (EOFException e) {
+        available = false;
+      }
+      if (available) {
+        in.reset();
+        logVersion = in.readInt();
+        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+          throw new IOException(
+              "Unexpected version of the file system log file: "
+              + logVersion + ". Current version = "
+              + FSConstants.LAYOUT_VERSION + ".");
+      }
+      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+        "Unsupported version " + logVersion;
+      return logVersion;
+    }
+  }
 
   /**
    * Class for writing editlog ops
@@ -1357,6 +1399,7 @@ public abstract class FSEditLogOp {
     public void writeOp(FSEditLogOp op) throws IOException {
       int start = buf.getLength();
       buf.writeByte(op.opCode.getOpCode());
+      buf.writeLong(op.txid);
       op.writeFields(buf);
       int end = buf.getLength();
       Checksum checksum = FSEditLog.getChecksum();
@@ -1384,7 +1427,12 @@ public abstract class FSEditLogOp {
     @SuppressWarnings("deprecation")
     public Reader(DataInputStream in, int logVersion,
                   Checksum checksum) {
-      this.in = in;
+      if (checksum != null) {
+        this.in = new DataInputStream(
+            new CheckedInputStream(in, checksum));
+      } else {
+        this.in = in;
+      }
       this.logVersion = logVersion;
       this.checksum = checksum;
     }
@@ -1423,9 +1471,15 @@ public abstract class FSEditLogOp {
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }
+
+      if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+        // Read the txid
+        op.setTransactionId(in.readLong());
+      }
+
       op.readFields(in, logVersion);
 
-      validateChecksum(in, checksum);
+      validateChecksum(in, checksum, op.txid);
       return op;
     }
 
@@ -1433,7 +1487,8 @@ public abstract class FSEditLogOp {
      * Validate a transaction's checksum
      */
     private void validateChecksum(DataInputStream in,
-                                  Checksum checksum)
+                                  Checksum checksum,
+                                  long txid)
         throws IOException {
       if (checksum != null) {
         int calculatedChecksum = (int)checksum.getValue();
@@ -1441,7 +1496,7 @@ public abstract class FSEditLogOp {
         if (readChecksum != calculatedChecksum) {
           throw new ChecksumException(
               "Transaction is corrupt. Calculated checksum is " +
-              calculatedChecksum + " but read checksum " + readChecksum, -1);
+              calculatedChecksum + " but read checksum " + readChecksum, txid);
         }
       }
     }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Fri Jul 29 16:28:45 2011
@@ -54,10 +54,8 @@ public enum FSEditLogOpCodes {
   OP_CANCEL_DELEGATION_TOKEN    ((byte) 20),
   OP_UPDATE_MASTER_KEY          ((byte) 21),
   OP_REASSIGN_LEASE             ((byte) 22),
-  // must be same as NamenodeProtocol.JA_JSPOOL_START
-  OP_JSPOOL_START               ((byte)102),
-  // must be same as NamenodeProtocol.JA_CHECKPOINT_TIME
-  OP_CHECKPOINT_TIME            ((byte)103);
+  OP_END_LOG_SEGMENT            ((byte) 23),
+  OP_START_LOG_SEGMENT          ((byte) 24);
 
   private byte opCode;
 



Mime
View raw message