hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1099986 - in /hadoop/hdfs/branches/HDFS-1073: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Thu, 05 May 2011 22:33:17 GMT
Author: todd
Date: Thu May  5 22:33:16 2011
New Revision: 1099986

URL: http://svn.apache.org/viewvc?rev=1099986&view=rev
Log:
HDFS-1799. Refactor log rolling and filename management out of FSEditLog. Contributed by Ivan Kelly and Todd Lipcon.

Added:
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
Modified:
    hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt Thu May  5 22:33:16 2011
@@ -19,3 +19,5 @@ HDFS-1859. Add some convenience function
 HDFS-1894. Add constants for LAYOUT_VERSIONs in edits log branch (todd)
 HDFS-1892. Fix EditLogFileInputStream.getValidLength to be aware of OP_INVALID
            filler (todd)
+HDFS-1799. Refactor log rolling and filename management out of FSEditLog
+           (Ivan Kelly and Todd Lipcon via todd)

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Thu May  5 22:33:16 2011
@@ -690,6 +690,11 @@ public abstract class Storage extends St
       lock.channel().close();
       lock = null;
     }
+    
+    @Override
+    public String toString() {
+      return "Storage Directory " + this.root;
+    }
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Thu May  5 22:33:16 2011
@@ -243,7 +243,7 @@ public class BackupNode extends NameNode
     }
     if(namesystem == null || namesystem.dir == null || getFSImage() == null)
       return true;
-    return fsImage.getEditLog().getNumEditStreams() == 0;
+    return false; // TODO fsImage.getEditLog().getNumJournals() == 0;
   }
 
   private NamespaceInfo handshake(Configuration conf) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Thu May  5 22:33:16 2011
@@ -134,6 +134,12 @@ class EditLogBackupOutputStream extends 
     bufCurrent = bufReady = null;
   }
 
+  @Override
+  public void abort() throws IOException {
+    RPC.stopProxy(backupNode);
+    bufCurrent = bufReady = null;
+  }
+
   @Override // EditLogOutputStream
   void setReadyToFlush() throws IOException {
     assert bufReady.size() == 0 : "previous data is not flushed yet";

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu May  5 22:33:16 2011
@@ -26,8 +26,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.zip.Checksum;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -35,6 +38,8 @@ import org.apache.hadoop.io.Writable;
  * stores edits in a local file.
  */
 class EditLogFileOutputStream extends EditLogOutputStream {
+  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
+
   private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
 
   private File file;
@@ -43,6 +48,7 @@ class EditLogFileOutputStream extends Ed
   private DataOutputBuffer bufCurrent; // current buffer for writing
   private DataOutputBuffer bufReady; // buffer ready for flushing
   final private int initBufferSize; // inital buffer size
+
   static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
 
   static {
@@ -128,6 +134,9 @@ class EditLogFileOutputStream extends Ed
 
   @Override
   public void close() throws IOException {
+    setReadyToFlush();
+    flush();
+    
     // close should have been called after all pending transactions
     // have been flushed & synced.
     int bufSize = bufCurrent.size();
@@ -143,6 +152,13 @@ class EditLogFileOutputStream extends Ed
     fp.close();
 
     bufCurrent = bufReady = null;
+    fp = null;
+  }
+  
+  @Override
+  public void abort() throws IOException {
+    IOUtils.cleanup(LOG, fp);
+    fp = null;
   }
 
   /**
@@ -221,4 +237,11 @@ class EditLogFileOutputStream extends Ed
   File getFile() {
     return file;
   }
+
+  /**
+   * @return true if this stream is currently open.
+   */
+  public boolean isOpen() {
+    return fp != null;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Thu May  5 22:33:16 2011
@@ -64,6 +64,12 @@ implements JournalStream {
   abstract public void close() throws IOException;
 
   /**
+   * Close the stream without necessarily flushing any pending data.
+   * This may be called after a previous write or close threw an exception.
+   */
+  abstract public void abort() throws IOException;
+  
+  /**
    * All data that has been written to the stream so far will be flushed.
    * New data can be still written to the stream while flushing is performed.
    */

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu May  5 22:33:16 2011
@@ -34,13 +34,11 @@ import org.apache.hadoop.hdfs.Deprecated
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
-import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -51,7 +49,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.PureJavaCrc32;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 
@@ -68,8 +68,6 @@ public class FSEditLog implements NNStor
 
   private static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
-  private volatile int sizeOutputFlushBuffer = 512*1024;
-
   private enum State {
     UNINITIALIZED,
     WRITING_EDITS,
@@ -78,7 +76,8 @@ public class FSEditLog implements NNStor
   }  
   private State state = State.UNINITIALIZED;
 
-  private ArrayList<EditLogOutputStream> editStreams = new ArrayList<EditLogOutputStream>();
+
+  private List<JournalAndStream> journals = Lists.newArrayList();
 
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
@@ -137,67 +136,33 @@ public class FSEditLog implements NNStor
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
   }
-
+  
   /**
    * Initialize the list of edit journals
    */
-  private void initJournals() throws IOException {
-    assert editStreams.isEmpty();
-
-    Preconditions.checkState(state == State.UNINITIALIZED || state == State.CLOSED,
+  private void initJournals() {
+    assert journals.isEmpty();
+    Preconditions.checkState(state == State.UNINITIALIZED,
         "Bad state: %s", state);
-
-    ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it 
-         = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File eFile = getEditFile(sd);
-      try {
-        addNewEditLogStream(eFile);
-      } catch (IOException e) {
-        LOG.warn("Unable to open edit log file " + eFile);
-        // Remove the directory from list of storage directories
-        if(al == null) al = new ArrayList<StorageDirectory>(1);
-        al.add(sd);
-        
-      }
-    }    
-    if(al != null) storage.reportErrorsOnDirectories(al);
     
-    if (editStreams.isEmpty()) {
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+      journals.add(new JournalAndStream(new FileJournalManager(sd)));
+    }
+    
+    if (journals.isEmpty()) {
       LOG.error("No edits directories configured!");
     }
     
     state = State.CLOSED;
   }
- 
-  private File getEditFile(StorageDirectory sd) {
-    return storage.getEditFile(sd);
-  }
-  
-  private File getEditNewFile(StorageDirectory sd) {
-    return storage.getEditNewFile(sd);
-  }
   
   private int getNumEditsDirs() {
    return storage.getNumStorageDirs(NameNodeDirType.EDITS);
   }
 
-  synchronized int getNumEditStreams() {
-    return editStreams == null ? 0 : editStreams.size();
-  }
-
-  /**
-   * Return the currently active edit streams.
-   * This should be used only by unit tests.
-   */
-  ArrayList<EditLogOutputStream> getEditStreams() {
-    return editStreams;
-  }
-
-  boolean isOpen() {
+  synchronized boolean isOpen() {
     return state == State.WRITING_EDITS ||
-      state == State.WRITING_EDITS_NEW;
+           state == State.WRITING_EDITS_NEW;
   }
 
   /**
@@ -207,8 +172,7 @@ public class FSEditLog implements NNStor
    * @throws IOException
    */
   synchronized void open() throws IOException {
-    if (state == State.UNINITIALIZED
-	|| state == State.CLOSED) {
+    if (state == State.UNINITIALIZED) {
       initJournals();
     }
     
@@ -217,20 +181,24 @@ public class FSEditLog implements NNStor
 
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
     
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.open();
+      }
+    
+    }, "Opening logs");
+    
     state = State.WRITING_EDITS;
   }
   
-  synchronized void addNewEditLogStream(File eFile) throws IOException {
-    EditLogOutputStream eStream = new EditLogFileOutputStream(eFile,
-        sizeOutputFlushBuffer);
-    editStreams.add(eStream);
-  }
-
+  // TODO remove me!
+  @Deprecated
   synchronized void createEditLogFile(File name) throws IOException {
     waitForSyncToFinish();
 
     EditLogOutputStream eStream = new EditLogFileOutputStream(name,
-        sizeOutputFlushBuffer);
+        1024);
     eStream.create();
     eStream.close();
   }
@@ -243,120 +211,40 @@ public class FSEditLog implements NNStor
       LOG.warn("Closing log when already closed", new Exception());
       return;
     }
-
+    
     waitForSyncToFinish();
-    if (editStreams.isEmpty()) {
+    if (journals.isEmpty()) {
       return;
     }
 
     printStatistics(true);
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
-    mapStreamsAndReportErrors(new StreamClosure() {
-        @Override
-        public void apply(EditLogOutputStream stream) throws IOException {
-          closeStream(stream);
-        }      
-      }, "Closing stream");
-   
-    editStreams.clear();
-
-    state = State.CLOSED;
-  }
-
-  /**
-   * Close and remove edit log stream.
-   * @param index of the stream
-   */
-  synchronized private void removeStream(int index) {
-    EditLogOutputStream eStream = editStreams.get(index);
-    try {
-      eStream.close();
-    } catch (Exception e) {}
-    editStreams.remove(index);
-  }
-
-  /**
-   * The specified streams have IO errors. Close and remove them.
-   */
-  synchronized
-  void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
-    if (errorStreams == null || errorStreams.size() == 0) {
-      return;                       // nothing to do
-    }
-    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
-    for (EditLogOutputStream e : errorStreams) {
-      if (e.getType() == JournalType.FILE) {
-        errorDirs.add(getStorageDirectoryForStream(e));
-      } else {
-        disableStream(e);
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.close();
       }
-    }
-
-    try {
-      storage.reportErrorsOnDirectories(errorDirs);
-    } catch (IOException ioe) {
-      LOG.error("Problem erroring streams " + ioe);
-    }
-  }
-
-
-  /**
-   * get an editStream corresponding to a sd
-   * @param es - stream to remove
-   * @return the matching stream
-   */
-  StorageDirectory getStorage(EditLogOutputStream es) {
-    String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
-    .getParentFile().getParentFile().getAbsolutePath();
+    }, "closing journal");
 
-    Iterator<StorageDirectory> it = storage.dirIterator(); 
-    while (it.hasNext()) {
-      StorageDirectory sd = it.next();
-      LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath()); 
-      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
-        return sd;
-    }
-    return null;
-  }
-  
-  /**
-   * get an editStream corresponding to a sd
-   * @param sd
-   * @return the matching stream
-   */
-  synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
-    for (EditLogOutputStream es : editStreams) {
-      File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
-        .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
-        return es;
-    }
-    return null;
-  }
-
-  /**
-   * check if edits.new log exists in the specified stoorage directory
-   */
-  boolean existsNew(StorageDirectory sd) {
-    return getEditNewFile(sd).exists(); 
+    state = State.CLOSED;
   }
 
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-
   void logEdit(final FSEditLogOpCodes opCode, final Writable ... writables) {
-    assert state != State.UNINITIALIZED && state != State.CLOSED;
-
+    assert state != State.CLOSED;
+    
     synchronized (this) {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if(getNumEditStreams() == 0)
+      if (journals.isEmpty()) {
         throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-
+      }
+      
       // Only start a new transaction for OPs which will be persisted to disk.
       // Obviously this excludes control op codes.
       long start = now();
@@ -364,14 +252,16 @@ public class FSEditLog implements NNStor
         start = beginTransaction();
       }
 
-      mapStreamsAndReportErrors(new StreamClosure() {
-          @Override
-          public void apply(EditLogOutputStream stream) throws IOException {
-            if(!stream.isOperationSupported(opCode.getOpCode()))
-              return;
-            stream.write(opCode.getOpCode(), txid, writables);
-          }      
-        }, "Writing op to stream");
+      mapJournalsAndReportErrors(new JournalClosure() {
+        @Override 
+        public void apply(JournalAndStream jas) throws IOException {
+          if (!jas.isActive()) return;
+          EditLogOutputStream stream = jas.stream;
+          if(!stream.isOperationSupported(opCode.getOpCode()))
+            return;
+          stream.write(opCode.getOpCode(), txid, writables);
+        }
+      }, "logging edit");
 
       endTransaction(start);
       
@@ -416,8 +306,10 @@ public class FSEditLog implements NNStor
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (EditLogOutputStream eStream : editStreams) {
-      if (eStream.shouldForceSync()) {
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+
+      if (jas.getCurrentStream().shouldForceSync()) {
         return true;
       }
     }
@@ -510,12 +402,15 @@ public class FSEditLog implements NNStor
    * waitForSyncToFinish() before assuming they are running alone.
    */
   public void logSync() {
-    ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
 
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
-    ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
+    
+    List<JournalAndStream> stillGoodJournals =
+      Lists.newArrayListWithCapacity(journals.size());
+    List<JournalAndStream> badJournals = Lists.newArrayList();
+    
     boolean sync = false;
     try {
       synchronized (this) {
@@ -546,20 +441,16 @@ public class FSEditLog implements NNStor
         sync = true;
   
         // swap buffers
-        assert editStreams.size() > 0 : "no editlog streams";
-        for(EditLogOutputStream eStream : editStreams) {
+        assert !journals.isEmpty() : "no editlog streams";
+        
+        for (JournalAndStream jas : journals) {
+          if (!jas.isActive()) continue;
           try {
-            eStream.setReadyToFlush();
-            streams.add(eStream);
+            jas.getCurrentStream().setReadyToFlush();
+            stillGoodJournals.add(jas);
           } catch (IOException ie) {
             LOG.error("Unable to get ready to flush.", ie);
-            //
-            // remember the streams that encountered an error.
-            //
-            if (errorStreams == null) {
-              errorStreams = new ArrayList<EditLogOutputStream>(1);
-            }
-            errorStreams.add(eStream);
+            badJournals.add(jas);
           }
         }
         } finally {
@@ -570,22 +461,20 @@ public class FSEditLog implements NNStor
   
       // do the sync
       long start = now();
-      for (EditLogOutputStream eStream : streams) {
+      for (JournalAndStream jas : stillGoodJournals) {
+        if (!jas.isActive()) continue;
         try {
-          eStream.flush();
+          jas.getCurrentStream().flush();
         } catch (IOException ie) {
           LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
-          if (errorStreams == null) {
-            errorStreams = new ArrayList<EditLogOutputStream>(1);
-          }
-          errorStreams.add(eStream);
+          badJournals.add(jas);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnStreams(errorStreams);
+      disableAndReportErrorOnJournals(badJournals);
   
       if (metrics != null) // Metrics non-null only when used inside name node
         metrics.syncs.inc(elapsed);
@@ -609,7 +498,7 @@ public class FSEditLog implements NNStor
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (editStreams == null || editStreams.size()==0) {
+    if (journals.isEmpty()) {
       return;
     }
     lastPrintTime = now;
@@ -621,12 +510,17 @@ public class FSEditLog implements NNStor
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    buf.append(editStreams.get(0).getNumSync());
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      buf.append(jas.getCurrentStream().getNumSync());
+      break;
+    }
+
     buf.append(" SyncTimes(ms): ");
 
-    int numEditStreams = editStreams.size();
-    for (int idx = 0; idx < numEditStreams; idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      EditLogOutputStream eStream = jas.getCurrentStream();
       buf.append(eStream.getTotalSyncTime());
       buf.append(" ");
     }
@@ -834,29 +728,40 @@ public class FSEditLog implements NNStor
    * Return the size of the current EditLog
    */
   synchronized long getEditLogSize() throws IOException {
-    assert getNumEditsDirs() <= getNumEditStreams() : 
+    assert getNumEditsDirs() <= journals.size() :
         "Number of edits directories should not exceed the number of streams.";
-    long size = 0;
-    ArrayList<EditLogOutputStream> al = null;
-    for (int idx = 0; idx < getNumEditStreams(); idx++) {
-      EditLogOutputStream es = editStreams.get(idx);
+    long size = -1;
+        
+    List<JournalAndStream> badJournals = Lists.newArrayList();
+    
+    for (JournalAndStream j : journals) {
+      if (!j.isActive()) continue;
+      EditLogOutputStream es = j.getCurrentStream();
       try {
         long curSize = es.length();
         assert (size == 0 || size == curSize || curSize ==0) :
           "Wrong streams size";
         size = Math.max(size, curSize);
       } catch (IOException e) {
-        LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
-            idx + ") " + es.getName());
-        if(al==null) al = new ArrayList<EditLogOutputStream>(1);
-        al.add(es);
+        LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e);
+        badJournals.add(j);
       }
     }
-    if(al!=null) disableAndReportErrorOnStreams(al);
+    disableAndReportErrorOnJournals(badJournals);
+    
+    assert size != -1;
     return size;
   }
   
   /**
+   * Used only by unit tests.
+   */
+  @VisibleForTesting
+  List<JournalAndStream> getJournals() {
+    return journals;
+  }
+  
+  /**
    * Return a manifest of what finalized edit logs are available
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
@@ -886,22 +791,6 @@ public class FSEditLog implements NNStor
     }
 
     waitForSyncToFinish();
-    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
-    if(!it.hasNext()) 
-      return;
-    //
-    // If edits.new already exists in some directory, verify it
-    // exists in all directories.
-    //
-    boolean alreadyExists = existsNew(it.next());
-    while(it.hasNext()) {
-      StorageDirectory sd = it.next();
-      if(alreadyExists != existsNew(sd))
-        throw new IOException(getEditNewFile(sd) 
-              + "should " + (alreadyExists ? "" : "not ") + "exist.");
-    }
-    if(alreadyExists)
-      return; // nothing to do, edits.new exists!
 
     // check if any of failed storage is now available and put it back
     storage.attemptRestoreRemovedStorage();
@@ -918,41 +807,19 @@ public class FSEditLog implements NNStor
    * @param dest new stream path relative to the storage directory root.
    * @throws IOException
    */
-  synchronized void divertFileStreams(String dest) throws IOException {
+  synchronized void divertFileStreams(final String dest) throws IOException {
     Preconditions.checkState(state == State.WRITING_EDITS,
         "Bad state: " + state);
 
     waitForSyncToFinish();
 
-    assert getNumEditStreams() >= getNumEditsDirs() :
-      "Inconsistent number of streams";
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    EditStreamIterator itE = 
-      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
-    Iterator<StorageDirectory> itD = 
-      storage.dirIterator(NameNodeDirType.EDITS);
-    while(itE.hasNext() && itD.hasNext()) {
-      EditLogOutputStream eStream = itE.next();
-      StorageDirectory sd = itD.next();
-      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream);
-      try {
-        // close old stream
-        closeStream(eStream);
-        // create new stream
-        eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
-            sizeOutputFlushBuffer);
-        eStream.create();
-        // replace by the new stream
-        itE.replace(eStream);
-      } catch (IOException e) {
-        LOG.warn("Error in editStream " + eStream.getName(), e);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
+    mapJournalsAndReportErrors(new JournalClosure() {
+      
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.divertFileStreams(dest);
       }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
+    }, "Diverting file streams to " + dest);
   }
 
   /**
@@ -961,9 +828,8 @@ public class FSEditLog implements NNStor
    */
   synchronized void purgeEditLog() throws IOException {
     Preconditions.checkState(state == State.WRITING_EDITS_NEW,
-                             "Bad state: " + state);
+        "Bad state: " + state);
 
-    waitForSyncToFinish();
     revertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
     state = State.WRITING_EDITS;
@@ -991,65 +857,17 @@ public class FSEditLog implements NNStor
    * @param dest new stream path relative to the storage directory root.
    * @throws IOException
    */
-  synchronized void revertFileStreams(String source) throws IOException {
+  synchronized void revertFileStreams(final String source) throws IOException {
     waitForSyncToFinish();
 
-    assert getNumEditStreams() >= getNumEditsDirs() :
-      "Inconsistent number of streams";
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    EditStreamIterator itE = 
-      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
-    Iterator<StorageDirectory> itD = 
-      storage.dirIterator(NameNodeDirType.EDITS);
-    while(itE.hasNext() && itD.hasNext()) {
-      EditLogOutputStream eStream = itE.next();
-      StorageDirectory sd = itD.next();
-      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream +
-                              " does not start with " + sd.getRoot().getPath());
-      try {
-        // close old stream
-        closeStream(eStream);
-        // rename edits.new to edits
-        File editFile = getEditFile(sd);
-        File prevEditFile = new File(sd.getRoot(), source);
-        if(prevEditFile.exists()) {
-          if(!prevEditFile.renameTo(editFile)) {
-            //
-            // renameTo() fails on Windows if the destination
-            // file exists.
-            //
-            if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
-              throw new IOException("Rename failed for " + sd.getRoot());
-            }
-          }
-        }
-        // open new stream
-        eStream = new EditLogFileOutputStream(editFile, sizeOutputFlushBuffer);
-        // replace by the new stream
-        itE.replace(eStream);
-      } catch (IOException e) {
-        LOG.warn("Error in editStream " + eStream.getName(), e);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
-  }
+    mapJournalsAndReportErrors(new JournalClosure() {
 
-  /**
-   * Return the name of the edit file
-   */
-  synchronized File getFsEditName() {
-    StorageDirectory sd = null;   
-    for (Iterator<StorageDirectory> it = 
-      storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      sd = it.next();   
-      if(sd.getRoot().canRead())
-        return getEditFile(sd);
-    }
-    return null;
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.revertFileStreams(source);
+      }
+      
+    }, "Reverting file streams to " + source);
   }
 
   /**
@@ -1058,7 +876,7 @@ public class FSEditLog implements NNStor
   synchronized long getFsEditTime() {
     Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(it.hasNext())
-      return getEditFile(it.next()).lastModified();
+      return NNStorage.getEditFile(it.next()).lastModified();
     return 0;
   }
 
@@ -1072,8 +890,10 @@ public class FSEditLog implements NNStor
 
 
   // sets the initial capacity of the flush buffer.
-  public void setBufferCapacity(int size) {
-    sizeOutputFlushBuffer = size;
+  public void setOutputBufferCapacity(int size) {
+    for (JournalAndStream jas : journals) {
+      jas.manager.setOutputBufferCapacity(size);
+    }
   }
 
 
@@ -1095,6 +915,7 @@ public class FSEditLog implements NNStor
   synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
                       NamenodeRegistration nnReg) // active name-node
   throws IOException {
+    /*
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
     if(editStreams == null)
@@ -1111,6 +932,8 @@ public class FSEditLog implements NNStor
       editStreams.add(boStream);
     }
     logEdit(OP_JSPOOL_START, (Writable[])null);
+    TODO: backupnode is disabled
+    */
   }
 
   /**
@@ -1118,88 +941,22 @@ public class FSEditLog implements NNStor
    * store yet.
    */
   synchronized void logEdit(final int length, final byte[] data) {
-    if(getNumEditStreams() == 0)
+    if (journals.isEmpty())
       throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
     long start = beginTransaction();
-
-    mapStreamsAndReportErrors(new StreamClosure() {
-        @Override
-        public void apply(EditLogOutputStream stream) throws IOException {
-          stream.write(data, 0, length);
+    
+    mapJournalsAndReportErrors(new JournalClosure() {
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        if (jas.isActive()) {
+          jas.getCurrentStream().write(data, 0, length);
         }
-      }, "Writing op to stream");
+      }      
+    }, "Logging edit");
 
     endTransaction(start);
   }
 
-  /**
-   * Iterates output streams based of the same type.
-   * Type null will iterate over all streams.
-   */
-  private class EditStreamIterator implements Iterator<EditLogOutputStream> {
-    JournalType type;
-    int prevIndex; // for remove()
-    int nextIndex; // for next()
-
-    EditStreamIterator(JournalType streamType) {
-      this.type = streamType;
-      this.nextIndex = 0;
-      this.prevIndex = 0;
-    }
-
-    public boolean hasNext() {
-      synchronized(FSEditLog.this) {
-        if(editStreams == null || 
-           editStreams.isEmpty() || nextIndex >= editStreams.size())
-          return false;
-        while(nextIndex < editStreams.size()
-              && !editStreams.get(nextIndex).getType().isOfType(type))
-          nextIndex++;
-        return nextIndex < editStreams.size();
-      }
-    }
-
-    public EditLogOutputStream next() {
-      EditLogOutputStream stream = null;
-      synchronized(FSEditLog.this) {
-        stream = editStreams.get(nextIndex);
-        prevIndex = nextIndex;
-        nextIndex++;
-        while(nextIndex < editStreams.size()
-            && !editStreams.get(nextIndex).getType().isOfType(type))
-        nextIndex++;
-      }
-      return stream;
-    }
-
-    public void remove() {
-      nextIndex = prevIndex; // restore previous state
-      removeStream(prevIndex); // remove last returned element
-      hasNext(); // reset nextIndex to correct place
-    }
-
-    void replace(EditLogOutputStream newStream) {
-      synchronized (FSEditLog.this) {
-        assert 0 <= prevIndex && prevIndex < editStreams.size() :
-                                                          "Index out of bound.";
-        editStreams.set(prevIndex, newStream);
-      }
-    }
-  }
-
-  /**
-   * Get stream iterator for the specified type.
-   */
-  public Iterator<EditLogOutputStream>
-  getOutputStreamIterator(JournalType streamType) {
-    return new EditStreamIterator(streamType);
-  }
-
-  private void closeStream(EditLogOutputStream eStream) throws IOException {
-    eStream.setReadyToFlush();
-    eStream.flush();
-    eStream.close();
-  }
 
   void incrementCheckpointTime() {
     storage.incrementCheckpointTime();
@@ -1208,6 +965,7 @@ public class FSEditLog implements NNStor
   }
 
   synchronized void releaseBackupStream(NamenodeRegistration registration) {
+    /*
     Iterator<EditLogOutputStream> it =
                                   getOutputStreamIterator(JournalType.BACKUP);
     ArrayList<EditLogOutputStream> errorStreams = null;
@@ -1224,11 +982,14 @@ public class FSEditLog implements NNStor
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnStreams(errorStreams);
+    disableAndReportErrorOnJournals(errorStreams);
+    TODO BN currently disabled
+    */
   }
 
   synchronized boolean checkBackupRegistration(
       NamenodeRegistration registration) {
+    /*
     Iterator<EditLogOutputStream> it =
                                   getOutputStreamIterator(JournalType.BACKUP);
     boolean regAllowed = !it.hasNext();
@@ -1251,8 +1012,12 @@ public class FSEditLog implements NNStor
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnStreams(errorStreams);
+    disableAndReportErrorOnJournals(errorStreams);
     return regAllowed;
+    
+    TODO BN currently disabled
+    */
+    return false;
   }
   
   static BytesWritable toBytesWritable(Options.Rename... options) {
@@ -1262,37 +1027,63 @@ public class FSEditLog implements NNStor
     }
     return new BytesWritable(bytes);
   }
+  
+  //// Iteration across journals
+  private interface JournalClosure {
+    public void apply(JournalAndStream jas) throws IOException;
+  }
+
+  /**
+   * Apply the given function across all of the journal managers, disabling
+   * any for which the closure throws an IOException.
+   * @param status message used for logging errors (e.g. "opening journal")
+   */
+  private void mapJournalsAndReportErrors(
+      JournalClosure closure, String status) {
+    List<JournalAndStream> badJAS = Lists.newLinkedList();
+    for (JournalAndStream jas : journals) {
+      try {
+        closure.apply(jas);
+      } catch (Throwable t) {
+        LOG.error("Error " + status + " (journal " + jas + ")", t);
+        badJAS.add(jas);
+      }
+    }
 
+    disableAndReportErrorOnJournals(badJAS);
+  }
+  
   /**
-   * Get the StorageDirectory for a stream
-   * @param es Stream whose StorageDirectory we wish to know
-   * @return the matching StorageDirectory
-   */
-  StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
-    String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
-
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      FSNamesystem.LOG.info("comparing: " + parentStorageDir 
-                            + " and " + sd.getRoot().getAbsolutePath()); 
-      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
-        return sd;
-    }
-    return null;
-  }
-
-  private synchronized void disableStream(EditLogOutputStream stream) {
-    try { stream.close(); } catch (IOException e) {
-      // nothing to do.
-      LOG.warn("Failed to close eStream " + stream.getName()
-               + " before removing it (might be ok)");
-    }
-    editStreams.remove(stream);
-
-    if (editStreams.size() <= 0) {
-      String msg = "Fatal Error: All storage directories are inaccessible.";
-      LOG.fatal(msg, new IOException(msg));
-      Runtime.getRuntime().exit(-1);
+   * Called when some journals experience an error in some operation.
+   * This propagates errors to the storage level.
+   */
+  void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
+    if (badJournals == null || badJournals.isEmpty()) {
+      return; // nothing to do
+    }
+
+    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
+    for (JournalAndStream j : badJournals) {
+      LOG.error("Disabling journal " + j);
+      StorageDirectory sd = j.getStorageDirectory();
+      if (sd != null) {
+        errorDirs.add(sd);
+        // We will report this error to storage, which will propagate back
+        // to our listener interface, at which point we'll mark it faulty.
+      } else {
+        // Just mark it faulty ourselves, since it's not associated with a
+        // storage directory.
+        markJournalFaulty(j);
+      }
+    }
+  }
+
+  private synchronized void markJournalFaulty(JournalAndStream jas) {
+    try {
+      jas.abort();
+    } catch (IOException e) {
+      LOG.warn("Failed to abort faulty journal " + jas
+          + " before removing it (might be OK)", e);
     }
   }
 
@@ -1304,22 +1095,18 @@ public class FSEditLog implements NNStor
   @Override // NNStorageListener
   public synchronized void errorOccurred(StorageDirectory sd)
       throws IOException {
-    ArrayList<EditLogOutputStream> errorStreams
-      = new ArrayList<EditLogOutputStream>();
-
-    for (EditLogOutputStream eStream : editStreams) {
-      LOG.error("Unable to log edits to " + eStream.getName()
-                + "; removing it");
-
-      StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
-      if (sd == streamStorageDir) {
-        errorStreams.add(eStream);
+    
+    LOG.debug("Error occurred on " + sd);
+    
+    for (JournalAndStream jas : journals) {
+      if (jas.getStorageDirectory() == sd) {
+        LOG.warn("Marking corresponding journal " + jas + " faulty");
+        markJournalFaulty(jas);
+        return;
       }
     }
-
-    for (EditLogOutputStream eStream : errorStreams) {
-      disableStream(eStream);
-    }
+    
+    LOG.debug("Faulty " + sd + " did not correspond to any live journal manager.");
   }
 
   @Override // NNStorageListener
@@ -1333,35 +1120,80 @@ public class FSEditLog implements NNStor
   @Override // NNStorageListener
   public synchronized void directoryAvailable(StorageDirectory sd)
       throws IOException {
-    // TODO this logic is very suspect, but will be re-done anyhow in a future
-    // patch on HDFS-1073
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)
-        && (state == State.WRITING_EDITS || state == State.WRITING_EDITS_NEW)) {
-      File eFile = getEditFile(sd);
-      addNewEditLogStream(eFile);
-    }
-  }
-  
-  //// Iteration across streams
-  private interface StreamClosure {
-    public void apply(EditLogOutputStream jm) throws IOException;
+    // We'll always just check all of the journals every time we roll.
   }
- 
+
   /**
-   * Apply the given function across all of the edit streams, disabling
-   * any for which the closure throws an IOException.
-   * @param status message used for logging errors (e.g. "opening journal")
+   * Container for a JournalManager paired with its currently
+   * active stream.
+   * 
+   * If a Journal gets disabled due to an error writing to its
+   * stream, then the stream will be aborted and set to null.
    */
-  private void mapStreamsAndReportErrors(StreamClosure closure, String status) {
-    ArrayList<EditLogOutputStream> badStreams = new ArrayList<EditLogOutputStream>();
-    for (EditLogOutputStream stream : editStreams) {
+  static class JournalAndStream {
+    private final JournalManager manager;
+    private EditLogOutputStream stream;
+    
+    public JournalAndStream(JournalManager manager) {
+      this.manager = manager;
+    }
+
+    public void open() throws IOException {
+      Preconditions.checkState(stream == null);
+      stream = manager.createStream();
+    }
+    
+    public void divertFileStreams(String dest) throws IOException {
+      if (stream != null) {
+        close();
+      }
+      stream = manager.createDivertedStream(dest);
+    }
+
+    public void revertFileStreams(String source) throws IOException {
+      if (stream != null) {
+        close();
+      }
+      stream = manager.createRevertedStream(source);
+    }
+
+    public StorageDirectory getStorageDirectory() {
+      return manager.getStorageDirectory();
+    }
+
+    public void close() throws IOException {
+      if (stream == null) return;
+      stream.close();
+      stream = null;
+    }
+    
+    public void abort() throws IOException {
+      if (stream == null) return;
       try {
-        closure.apply(stream);
+        stream.abort();
       } catch (IOException ioe) {
-        LOG.error("Error " + status + " (stream " + stream + ")", ioe);
-        badStreams.add(stream);
+        LOG.error("Unable to abort stream " + stream, ioe);
       }
+      stream = null;
+    }
+
+    boolean isActive() {
+      return stream != null;
+    }
+    
+    EditLogOutputStream getCurrentStream() {
+      return stream;
+    }
+    
+    @Override
+    public String toString() {
+      return "JournalAndStream(mgr=" + manager +
+        ", " + "stream=" + stream + ")";
+    }
+
+    @VisibleForTesting
+    void setCurrentStreamForTests(EditLogFileOutputStream stream) {
+      this.stream = stream;
     }
-    disableAndReportErrorOnStreams(badStreams);
   }
 }

Added: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1099986&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (added)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Thu May  5 22:33:16 2011
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Journal manager for the common case of edits files being written
+ * to a storage directory.
+ */
+public class FileJournalManager implements JournalManager {
+
+  private final StorageDirectory sd;
+  private EditLogFileOutputStream currentStream;
+  private int outputBufferCapacity = 512*1024;
+
+  public FileJournalManager(StorageDirectory sd) {
+    this.sd = sd;
+  }
+  
+  private boolean isCurrentStreamClosed() {
+    return currentStream == null || !currentStream.isOpen();
+  }
+  
+  @Override
+  public EditLogOutputStream createStream() throws IOException {
+    Preconditions.checkState(isCurrentStreamClosed());
+    File eFile = NNStorage.getEditFile(sd);
+    
+    currentStream = new EditLogFileOutputStream(
+        eFile, outputBufferCapacity);
+    return currentStream;
+  }
+
+  @Override
+  public EditLogOutputStream createDivertedStream(String dest)
+    throws IOException {
+
+    // create new stream
+    currentStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
+        outputBufferCapacity);
+    currentStream.create();
+    return currentStream;    
+  }
+
+  @Override
+  public EditLogOutputStream createRevertedStream(String source)
+      throws IOException {
+    Preconditions.checkState(isCurrentStreamClosed());
+    
+    // rename edits.new to edits
+    File editFile = NNStorage.getEditFile(sd);
+    File prevEditFile = new File(sd.getRoot(), source);
+    if(prevEditFile.exists()) {
+      if(!prevEditFile.renameTo(editFile)) {
+        //
+        // renameTo() fails on Windows if the destination
+        // file exists.
+        //
+        if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
+          throw new IOException("Rename failed for " + sd.getRoot());
+        }
+      }
+    }
+    
+    return createStream();
+  }
+
+  @Override
+  public StorageDirectory getStorageDirectory() {
+    return sd;
+  }
+
+  @Override
+  public void setOutputBufferCapacity(int size) {
+    this.outputBufferCapacity = size;
+  }
+
+}

Added: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1099986&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (added)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Thu May  5 22:33:16 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+/**
+ * A JournalManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface JournalManager {
+
+  /**
+   * @return the StorageDirectory associated with this journal,
+   * or null if this is not a disk-based journal 
+   */
+  StorageDirectory getStorageDirectory();
+
+  /**
+   * Prepare the stream to write edits
+   */
+  EditLogOutputStream createStream() throws IOException;
+
+  /**
+   * Divert streams, eg to "edits.new". This will go away with 1073 TODO
+   */
+  EditLogOutputStream createDivertedStream(String dest) throws IOException;
+
+  /**
+   * Revert streams, eg by renaming edits.new back to "edits". This will go
+   * away with HDFS-1073
+   */
+  EditLogOutputStream createRevertedStream(String source) throws IOException;
+
+  /**
+   * Set the amount of memory that this stream should use to buffer edits
+   */
+  void setOutputBufferCapacity(int size);
+}

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Thu May  5 22:33:16 2011
@@ -788,14 +788,14 @@ public class NNStorage extends Storage i
   /**
    * @return A editlog File in storage directory 'sd'.
    */
-  File getEditFile(StorageDirectory sd) {
+  static File getEditFile(StorageDirectory sd) {
     return getStorageFile(sd, NameNodeFile.EDITS);
   }
 
   /**
    * @return A temporary editlog File in storage directory 'sd'.
    */
-  File getEditNewFile(StorageDirectory sd) {
+  static File getEditNewFile(StorageDirectory sd) {
     return getStorageFile(sd, NameNodeFile.EDITS_NEW);
   }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Thu May  5 22:33:16 2011
@@ -189,7 +189,7 @@ public class TestEditLog extends TestCas
       FSEditLog editLog = fsimage.getEditLog();
   
       // set small size of flush buffer
-      editLog.setBufferCapacity(initialSize);
+      editLog.setOutputBufferCapacity(initialSize);
       editLog.close();
       editLog.open();
     
@@ -389,7 +389,8 @@ public class TestEditLog extends TestCas
     FSImage fsimage = namesystem.getFSImage();
     final FSEditLog editLog = fsimage.getEditLog();
     fileSys.mkdirs(new Path("/tmp"));
-    File editFile = editLog.getFsEditName();
+    StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
+    File editFile = NNStorage.getStorageFile(sd, NameNodeFile.EDITS);
     editLog.close();
     cluster.shutdown();
       long fileLen = editFile.length();

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java Thu May  5 22:33:16 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.junit.Test;
 
 public class TestEditLogFileOutputStream {
@@ -39,7 +40,8 @@ public class TestEditLogFileOutputStream
     NameNode.format(conf);
     NameNode nn = new NameNode(conf);
 
-    File editLog = nn.getFSImage().getEditLog().getFsEditName();
+    StorageDirectory sd = nn.getFSImage().getStorage().getStorageDir(0);
+    File editLog = NNStorage.getEditFile(sd);
 
     assertEquals("Edit log should only be 4 bytes long",
         4, editLog.length());

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java Thu May  5 22:33:16 2011
@@ -181,7 +181,7 @@ public class TestEditLogRace {
       FSEditLog editLog = fsimage.getEditLog();
 
       // set small size of flush buffer
-      editLog.setBufferCapacity(2048);
+      editLog.setOutputBufferCapacity(2048);
       editLog.close();
       editLog.open();
 
@@ -258,7 +258,7 @@ public class TestEditLogRace {
       FSEditLog editLog = fsimage.getEditLog();
 
       // set small size of flush buffer
-      editLog.setBufferCapacity(2048);
+      editLog.setOutputBufferCapacity(2048);
       editLog.close();
       editLog.open();
 
@@ -341,9 +341,10 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      ArrayList<EditLogOutputStream> streams = editLog.getEditStreams();
-      EditLogOutputStream spyElos = spy(streams.get(0));
-      streams.set(0, spyElos);
+      FSEditLog.JournalAndStream jas = editLog.getJournals().get(0);
+      EditLogFileOutputStream spyElos =
+          spy((EditLogFileOutputStream)jas.getCurrentStream());
+      jas.setCurrentStreamForTests(spyElos);
 
       final AtomicReference<Throwable> deferredException =
           new AtomicReference<Throwable>();

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Thu May  5 22:33:16 2011
@@ -106,7 +106,7 @@ public class TestSecurityTokenEditLog ex
       FSEditLog editLog = fsimage.getEditLog();
   
       // set small size of flush buffer
-      editLog.setBufferCapacity(2048);
+      editLog.setOutputBufferCapacity(2048);
       editLog.close();
       editLog.open();
       namesystem.getDelegationTokenSecretManager().startThreads();

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java?rev=1099986&r1=1099985&r2=1099986&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java Thu May  5 22:33:16 2011
@@ -112,15 +112,6 @@ public class TestStorageRestore extends 
     // set the restore feature on
     config.setBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, true);
   }
-
-  /**
-   * clean up
-   */
-  public void tearDown() throws Exception {
-    if (hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir) ) {
-      throw new IOException("Could not delete hdfs directory in tearDown '" + hdfsDir + "'");
-    } 
-  }
   
   /**
    * invalidate storage by removing storage directories
@@ -256,8 +247,11 @@ public class TestStorageRestore extends 
       // should be different
       //assertTrue(fsImg1.length() != fsImg2.length());
       //assertTrue(fsImg1.length() != fsImg3.length());
-      assertTrue("edits1 = edits2", fsEdits1.length() != fsEdits2.length());
-      assertTrue("edits1 = edits3", fsEdits1.length() != fsEdits3.length());
+      long len1 = EditLogFileInputStream.getValidLength(fsEdits1);
+      long len2 = EditLogFileInputStream.getValidLength(fsEdits2);
+      long len3 = EditLogFileInputStream.getValidLength(fsEdits3);
+      assertTrue("edits1 = edits2", len1 != len2);
+      assertTrue("edits1 = edits3", len1 != len3);
       
       assertTrue(!md5_1.equals(md5_2));
       assertTrue(!md5_1.equals(md5_3));



Mime
View raw message