hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1089737 - in /hadoop/hdfs/branches/HDFS-1073: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Thu, 07 Apr 2011 04:46:32 GMT
Author: todd
Date: Thu Apr  7 04:46:32 2011
New Revision: 1089737

URL: http://svn.apache.org/viewvc?rev=1089737&view=rev
Log:
HDFS-1799. Refactor log rolling and filename management into a JournalManager interface. Contributed
by Todd Lipcon

Added:
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
Modified:
    hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt?rev=1089737&r1=1089736&r2=1089737&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt Thu Apr  7 04:46:32 2011
@@ -12,3 +12,5 @@ HDFS-1729. Add code to detect valid leng
 HDFS-1793. Add code to inspect a storage directory with txid-based filenames
            (todd)
 HDFS-1794. Add code to list which edit logs are available on a remote NN (todd)
+HDFS-1799. Refactor log rolling and filename management into a JournalManager
+           interface. (todd)

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1089737&r1=1089736&r2=1089737&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
Thu Apr  7 04:46:32 2011
@@ -663,6 +663,11 @@ public abstract class Storage extends St
       lock.channel().close();
       lock = null;
     }
+    
+    @Override
+    public String toString() {
+      return "Storage Directory " + this.root;
+    }
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1089737&r1=1089736&r2=1089737&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
Thu Apr  7 04:46:32 2011
@@ -235,7 +235,7 @@ public class BackupNode extends NameNode
     }
     if(namesystem == null || namesystem.dir == null || getFSImage() == null)
       return true;
-    return fsImage.getEditLog().getNumEditStreams() == 0;
+    return false; // TODO fsImage.getEditLog().getNumJournals() == 0;
   }
 
   private NamespaceInfo handshake(Configuration conf) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1089737&r1=1089736&r2=1089737&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Thu Apr  7 04:46:32 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -49,6 +50,9 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 
 /**
@@ -64,9 +68,17 @@ public class FSEditLog implements NNStor
 
   private static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
-  private volatile int sizeOutputFlushBuffer = 512*1024;
+  private enum State {
+    UNINITIALIZED,
+    WRITING_EDITS,
+    WRITING_EDITS_NEW,
+    CLOSED;
+  }  
+  private State state = State.UNINITIALIZED;
+
 
-  private ArrayList<EditLogOutputStream> editStreams = null;
+  private List<JournalManager> journals = Lists.newArrayList();
+  private List<JournalManager> faultyJournals = Lists.newArrayList();;
 
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
@@ -91,6 +103,7 @@ public class FSEditLog implements NNStor
 
   private NNStorage storage;
 
+
   private static class TransactionId {
     public long txid;
 
@@ -114,32 +127,33 @@ public class FSEditLog implements NNStor
     lastPrintTime = now();
   }
   
-  private File getEditFile(StorageDirectory sd) {
-    return storage.getEditFile(sd);
-  }
-  
-  private File getEditNewFile(StorageDirectory sd) {
-    return storage.getEditNewFile(sd);
+  /**
+   * Initialize the list of edit journals
+   */
+  private void initJournals() {
+    assert journals.isEmpty();
+    assert faultyJournals.isEmpty();
+    Preconditions.checkState(state == State.UNINITIALIZED,
+        "Bad state: %s", state);
+    
+    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+      journals.add(new FileJournalManager(sd));
+    }
+    
+    if (journals.isEmpty()) {
+      LOG.error("No edits directories configured!");
+    }
+    
+    state = State.CLOSED;
   }
   
   private int getNumEditsDirs() {
    return storage.getNumStorageDirs(NameNodeDirType.EDITS);
   }
 
-  synchronized int getNumEditStreams() {
-    return editStreams == null ? 0 : editStreams.size();
-  }
-
-  /**
-   * Return the currently active edit streams.
-   * This should be used only by unit tests.
-   */
-  ArrayList<EditLogOutputStream> getEditStreams() {
-    return editStreams;
-  }
-
-  boolean isOpen() {
-    return getNumEditStreams() > 0;
+  synchronized boolean isOpen() {
+    return state == State.WRITING_EDITS ||
+           state == State.WRITING_EDITS_NEW;
   }
 
   /**
@@ -149,41 +163,33 @@ public class FSEditLog implements NNStor
    * @throws IOException
    */
   synchronized void open() throws IOException {
+    if (state == State.UNINITIALIZED) {
+      initJournals();
+    }
+    
+    Preconditions.checkState(state == State.CLOSED,
+        "Bad state: %s", state);
+
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
-    if (editStreams == null)
-      editStreams = new ArrayList<EditLogOutputStream>();
     
-    ArrayList<StorageDirectory> al = null;
-    for (Iterator<StorageDirectory> it 
-         = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File eFile = getEditFile(sd);
-      try {
-        addNewEditLogStream(eFile);
-      } catch (IOException e) {
-        LOG.warn("Unable to open edit log file " + eFile);
-        // Remove the directory from list of storage directories
-        if(al == null) al = new ArrayList<StorageDirectory>(1);
-        al.add(sd);
-        
+    mapJournalsAndReportErrors(new JournalManagerClosure() {
+      @Override
+      public void apply(JournalManager jm) throws IOException {
+        jm.open();
       }
-    }
     
-    if(al != null) storage.reportErrorsOnDirectories(al);
+    }, "Opening logs");
+    
+    state = State.WRITING_EDITS;
   }
   
-  
-  synchronized void addNewEditLogStream(File eFile) throws IOException {
-    EditLogOutputStream eStream = new EditLogFileOutputStream(eFile,
-        sizeOutputFlushBuffer);
-    editStreams.add(eStream);
-  }
-
+  // TODO remove me!
+  @Deprecated
   synchronized void createEditLogFile(File name) throws IOException {
     waitForSyncToFinish();
 
     EditLogOutputStream eStream = new EditLogFileOutputStream(name,
-        sizeOutputFlushBuffer);
+        1024);
     eStream.create();
     eStream.close();
   }
@@ -192,122 +198,45 @@ public class FSEditLog implements NNStor
    * Shutdown the file store.
    */
   synchronized void close() {
+    LOG.debug("Closing EditLog", new Exception());
+    if (state == State.CLOSED) {
+      LOG.warn("Closing log when already closed", new Exception());
+      return;
+    }
+    
     waitForSyncToFinish();
-    if (editStreams == null || editStreams.isEmpty()) {
+    if (journals.isEmpty()) {
       return;
     }
+
     printStatistics(true);
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    Iterator<EditLogOutputStream> it = getOutputStreamIterator(null);
-    while(it.hasNext()) {
-      EditLogOutputStream eStream = it.next();
-      try {
-        closeStream(eStream);
-      } catch (IOException e) {
-        LOG.warn("FSEditLog:close - failed to close stream " 
-            + eStream.getName());
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
+    mapJournalsAndReportErrors(new JournalManagerClosure() {
+      @Override
+      public void apply(JournalManager jm) throws IOException {
+        jm.close();
       }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
-    editStreams.clear();
-  }
+    }, "closing journal");
 
-  /**
-   * Close and remove edit log stream.
-   * @param index of the stream
-   */
-  synchronized private void removeStream(int index) {
-    EditLogOutputStream eStream = editStreams.get(index);
-    try {
-      eStream.close();
-    } catch (Exception e) {}
-    editStreams.remove(index);
-  }
-
-  /**
-   * The specified streams have IO errors. Close and remove them.
-   */
-  synchronized
-  void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
-    if (errorStreams == null || errorStreams.size() == 0) {
-      return;                       // nothing to do
-    }
-    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
-    for (EditLogOutputStream e : errorStreams) {
-      if (e.getType() == JournalType.FILE) {
-        errorDirs.add(getStorageDirectoryForStream(e));
-      } else {
-        disableStream(e);
-      }
-    }
-
-    try {
-      storage.reportErrorsOnDirectories(errorDirs);
-    } catch (IOException ioe) {
-      LOG.error("Problem erroring streams " + ioe);
-    }
-  }
-
-
-  /**
-   * get an editStream corresponding to a sd
-   * @param es - stream to remove
-   * @return the matching stream
-   */
-  StorageDirectory getStorage(EditLogOutputStream es) {
-    String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
-    .getParentFile().getParentFile().getAbsolutePath();
-
-    Iterator<StorageDirectory> it = storage.dirIterator(); 
-    while (it.hasNext()) {
-      StorageDirectory sd = it.next();
-      LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());

-      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
-        return sd;
-    }
-    return null;
-  }
-  
-  /**
-   * get an editStream corresponding to a sd
-   * @param sd
-   * @return the matching stream
-   */
-  synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
-    for (EditLogOutputStream es : editStreams) {
-      File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
-        .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
-        return es;
-    }
-    return null;
-  }
-
-  /**
-   * check if edits.new log exists in the specified stoorage directory
-   */
-  boolean existsNew(StorageDirectory sd) {
-    return getEditNewFile(sd).exists(); 
+    state = State.CLOSED;
   }
 
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  void logEdit(FSEditLogOpCodes opCode, Writable ... writables) {
+  void logEdit(final FSEditLogOpCodes opCode, final Writable ... writables) {
+    assert state != State.CLOSED;
+    
     synchronized (this) {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if(getNumEditStreams() == 0)
+      if (journals.isEmpty()) {
         throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      ArrayList<EditLogOutputStream> errorStreams = null;
-
+      }
+      
       // Only start a new transaction for OPs which will be persisted to disk.
       // Obviously this excludes control op codes.
       long start = now();
@@ -315,19 +244,16 @@ public class FSEditLog implements NNStor
         start = beginTransaction();
       }
 
-      for(EditLogOutputStream eStream : editStreams) {
-        if(!eStream.isOperationSupported(opCode.getOpCode()))
-          continue;
-        try {
-          eStream.write(opCode.getOpCode(), txid, writables);
-        } catch (IOException ie) {
-          LOG.error("logEdit: removing "+ eStream.getName(), ie);
-          if(errorStreams == null)
-            errorStreams = new ArrayList<EditLogOutputStream>(1);
-          errorStreams.add(eStream);
+      mapJournalsAndReportErrors(new JournalManagerClosure() {
+        @Override 
+        public void apply(JournalManager jm) throws IOException {
+          EditLogOutputStream stream = jm.getCurrentStream();
+          if(!stream.isOperationSupported(opCode.getOpCode()))
+            return;
+          stream.write(opCode.getOpCode(), txid, writables);
         }
-      }
-      disableAndReportErrorOnStreams(errorStreams);
+      }, "logging edit");
+
       endTransaction(start);
       
       // check if it is time to schedule an automatic sync
@@ -371,8 +297,8 @@ public class FSEditLog implements NNStor
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (EditLogOutputStream eStream : editStreams) {
-      if (eStream.shouldForceSync()) {
+    for (JournalManager j : journals) {
+      if (j.getCurrentStream().shouldForceSync()) {
         return true;
       }
     }
@@ -465,12 +391,15 @@ public class FSEditLog implements NNStor
    * waitForSyncToFinish() before assuming they are running alone.
    */
   public void logSync() {
-    ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
 
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
-    ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
+    
+    List<JournalManager> stillGoodJournals =
+      Lists.newArrayListWithCapacity(journals.size());
+    List<JournalManager> badJournals = Lists.newArrayList();
+    
     boolean sync = false;
     try {
       synchronized (this) {
@@ -501,20 +430,15 @@ public class FSEditLog implements NNStor
         sync = true;
   
         // swap buffers
-        assert editStreams.size() > 0 : "no editlog streams";
-        for(EditLogOutputStream eStream : editStreams) {
+        assert !journals.isEmpty() : "no editlog streams";
+        
+        for (JournalManager j : journals) {
           try {
-            eStream.setReadyToFlush();
-            streams.add(eStream);
+            j.getCurrentStream().setReadyToFlush();
+            stillGoodJournals.add(j);
           } catch (IOException ie) {
             LOG.error("Unable to get ready to flush.", ie);
-            //
-            // remember the streams that encountered an error.
-            //
-            if (errorStreams == null) {
-              errorStreams = new ArrayList<EditLogOutputStream>(1);
-            }
-            errorStreams.add(eStream);
+            badJournals.add(j);
           }
         }
         } finally {
@@ -525,22 +449,19 @@ public class FSEditLog implements NNStor
   
       // do the sync
       long start = now();
-      for (EditLogOutputStream eStream : streams) {
+      for (JournalManager j : stillGoodJournals) {
         try {
-          eStream.flush();
+          j.getCurrentStream().flush();
         } catch (IOException ie) {
           LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
-          if (errorStreams == null) {
-            errorStreams = new ArrayList<EditLogOutputStream>(1);
-          }
-          errorStreams.add(eStream);
+          badJournals.add(j);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnStreams(errorStreams);
+      disableAndReportErrorOnJournals(badJournals);
   
       if (metrics != null) // Metrics non-null only when used inside name node
         metrics.syncs.inc(elapsed);
@@ -564,7 +485,7 @@ public class FSEditLog implements NNStor
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (editStreams == null || editStreams.size()==0) {
+    if (journals.isEmpty()) {
       return;
     }
     lastPrintTime = now;
@@ -576,12 +497,11 @@ public class FSEditLog implements NNStor
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    buf.append(editStreams.get(0).getNumSync());
+    buf.append(journals.get(0).getCurrentStream().getNumSync());
     buf.append(" SyncTimes(ms): ");
 
-    int numEditStreams = editStreams.size();
-    for (int idx = 0; idx < numEditStreams; idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
+    for (JournalManager j : journals) {
+      EditLogOutputStream eStream = j.getCurrentStream();
       buf.append(eStream.getTotalSyncTime());
       buf.append(" ");
     }
@@ -789,29 +709,37 @@ public class FSEditLog implements NNStor
    * Return the size of the current EditLog
    */
   synchronized long getEditLogSize() throws IOException {
-    assert getNumEditsDirs() <= getNumEditStreams() : 
+    assert getNumEditsDirs() <= journals.size() :
         "Number of edits directories should not exceed the number of streams.";
     long size = 0;
-    ArrayList<EditLogOutputStream> al = null;
-    for (int idx = 0; idx < getNumEditStreams(); idx++) {
-      EditLogOutputStream es = editStreams.get(idx);
+        
+    List<JournalManager> badJournals = Lists.newArrayList();
+    
+    for (JournalManager j : journals) {
+      EditLogOutputStream es = j.getCurrentStream();
       try {
         long curSize = es.length();
         assert (size == 0 || size == curSize || curSize ==0) :
           "Wrong streams size";
         size = Math.max(size, curSize);
       } catch (IOException e) {
-        LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
-            idx + ") " + es.getName());
-        if(al==null) al = new ArrayList<EditLogOutputStream>(1);
-        al.add(es);
+        LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e);
+        badJournals.add(j);
       }
     }
-    if(al!=null) disableAndReportErrorOnStreams(al);
+    disableAndReportErrorOnJournals(badJournals);
+    
     return size;
   }
   
   /**
+   * Used only by unit tests.
+   */
+  List<JournalManager> getJournals() {
+    return journals;
+  }
+  
+  /**
    * Return a manifest of what finalized edit logs are available
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
@@ -832,29 +760,22 @@ public class FSEditLog implements NNStor
    *         in the new log
    */
   synchronized void rollEditLog() throws IOException {
-    waitForSyncToFinish();
-    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
-    if(!it.hasNext()) 
+    Preconditions.checkState(state == State.WRITING_EDITS ||
+                             state == State.WRITING_EDITS_NEW,
+                             "Bad state: %s", state);
+    if (state == State.WRITING_EDITS_NEW){
+      LOG.debug("Tried to roll edit logs when already rolled");
       return;
-    //
-    // If edits.new already exists in some directory, verify it
-    // exists in all directories.
-    //
-    boolean alreadyExists = existsNew(it.next());
-    while(it.hasNext()) {
-      StorageDirectory sd = it.next();
-      if(alreadyExists != existsNew(sd))
-        throw new IOException(getEditNewFile(sd) 
-              + "should " + (alreadyExists ? "" : "not ") + "exist.");
     }
-    if(alreadyExists)
-      return; // nothing to do, edits.new exists!
+
+    waitForSyncToFinish();
 
     // check if any of failed storage is now available and put it back
     storage.attemptRestoreRemovedStorage();
 
     divertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
+    state = State.WRITING_EDITS_NEW;
   }
 
   /**
@@ -864,48 +785,33 @@ public class FSEditLog implements NNStor
    * @param dest new stream path relative to the storage directory root.
    * @throws IOException
    */
-  synchronized void divertFileStreams(String dest) throws IOException {
+  synchronized void divertFileStreams(final String dest) throws IOException {
+    Preconditions.checkState(state == State.WRITING_EDITS,
+        "Bad state: " + state);
+
     waitForSyncToFinish();
 
-    assert getNumEditStreams() >= getNumEditsDirs() :
-      "Inconsistent number of streams";
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    EditStreamIterator itE = 
-      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
-    Iterator<StorageDirectory> itD = 
-      storage.dirIterator(NameNodeDirType.EDITS);
-    while(itE.hasNext() && itD.hasNext()) {
-      EditLogOutputStream eStream = itE.next();
-      StorageDirectory sd = itD.next();
-      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream);
-      try {
-        // close old stream
-        closeStream(eStream);
-        // create new stream
-        eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
-            sizeOutputFlushBuffer);
-        eStream.create();
-        // replace by the new stream
-        itE.replace(eStream);
-      } catch (IOException e) {
-        LOG.warn("Error in editStream " + eStream.getName(), e);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
+    mapJournalsAndReportErrors(new JournalManagerClosure() {
+      
+      @Override
+      public void apply(JournalManager jm) throws IOException {
+        jm.divertFileStreams(dest);
+      }
+    }, "Diverting file streams to " + dest);
       }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
-  }
 
   /**
    * Removes the old edit log and renames edits.new to edits.
    * Reopens the edits file.
    */
   synchronized void purgeEditLog() throws IOException {
+    Preconditions.checkState(state == State.WRITING_EDITS_NEW,
+        "Bad state: " + state);
+
     waitForSyncToFinish();
     revertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
+    state = State.WRITING_EDITS;
   }
 
 
@@ -930,65 +836,17 @@ public class FSEditLog implements NNStor
    * @param dest new stream path relative to the storage directory root.
    * @throws IOException
    */
-  synchronized void revertFileStreams(String source) throws IOException {
+  synchronized void revertFileStreams(final String source) throws IOException {
     waitForSyncToFinish();
 
-    assert getNumEditStreams() >= getNumEditsDirs() :
-      "Inconsistent number of streams";
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    EditStreamIterator itE = 
-      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
-    Iterator<StorageDirectory> itD = 
-      storage.dirIterator(NameNodeDirType.EDITS);
-    while(itE.hasNext() && itD.hasNext()) {
-      EditLogOutputStream eStream = itE.next();
-      StorageDirectory sd = itD.next();
-      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream +
-                              " does not start with " + sd.getRoot().getPath());
-      try {
-        // close old stream
-        closeStream(eStream);
-        // rename edits.new to edits
-        File editFile = getEditFile(sd);
-        File prevEditFile = new File(sd.getRoot(), source);
-        if(prevEditFile.exists()) {
-          if(!prevEditFile.renameTo(editFile)) {
-            //
-            // renameTo() fails on Windows if the destination
-            // file exists.
-            //
-            if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
-              throw new IOException("Rename failed for " + sd.getRoot());
-            }
-          }
-        }
-        // open new stream
-        eStream = new EditLogFileOutputStream(editFile, sizeOutputFlushBuffer);
-        // replace by the new stream
-        itE.replace(eStream);
-      } catch (IOException e) {
-        LOG.warn("Error in editStream " + eStream.getName(), e);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
-  }
+    mapJournalsAndReportErrors(new JournalManagerClosure() {
 
-  /**
-   * Return the name of the edit file
-   */
-  synchronized File getFsEditName() {
-    StorageDirectory sd = null;   
-    for (Iterator<StorageDirectory> it = 
-      storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      sd = it.next();   
-      if(sd.getRoot().canRead())
-        return getEditFile(sd);
-    }
-    return null;
+      @Override
+      public void apply(JournalManager jm) throws IOException {
+        jm.revertFileStreams(source);
+      }
+      
+    }, "Reverting file streams to " + source);
   }
 
   /**
@@ -997,7 +855,7 @@ public class FSEditLog implements NNStor
   synchronized long getFsEditTime() {
     Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(it.hasNext())
-      return getEditFile(it.next()).lastModified();
+      return NNStorage.getEditFile(it.next()).lastModified();
     return 0;
   }
 
@@ -1012,7 +870,9 @@ public class FSEditLog implements NNStor
 
   // sets the initial capacity of the flush buffer.
   public void setBufferCapacity(int size) {
-    sizeOutputFlushBuffer = size;
+    for (JournalManager jm : journals) {
+      jm.setBufferCapacity(size);
+    }
   }
 
 
@@ -1034,6 +894,7 @@ public class FSEditLog implements NNStor
   synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
                       NamenodeRegistration nnReg) // active name-node
   throws IOException {
+    /*
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
     if(editStreams == null)
@@ -1050,99 +911,29 @@ public class FSEditLog implements NNStor
       editStreams.add(boStream);
     }
     logEdit(OP_JSPOOL_START, (Writable[])null);
+    TODO: backupnode is disabled
+    */
   }
 
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  synchronized void logEdit(int length, byte[] data) {
-    if(getNumEditStreams() == 0)
+  synchronized void logEdit(final int length, final byte[] data) {
+    if (journals.isEmpty())
       throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-    ArrayList<EditLogOutputStream> errorStreams = null;
     long start = beginTransaction();
-    for(EditLogOutputStream eStream : editStreams) {
-      try {
-        eStream.write(data, 0, length);
-      } catch (IOException ie) {
-        LOG.warn("Error in editStream " + eStream.getName(), ie);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
-    endTransaction(start);
-  }
-
-  /**
-   * Iterates output streams based of the same type.
-   * Type null will iterate over all streams.
-   */
-  private class EditStreamIterator implements Iterator<EditLogOutputStream> {
-    JournalType type;
-    int prevIndex; // for remove()
-    int nextIndex; // for next()
-
-    EditStreamIterator(JournalType streamType) {
-      this.type = streamType;
-      this.nextIndex = 0;
-      this.prevIndex = 0;
-    }
-
-    public boolean hasNext() {
-      synchronized(FSEditLog.this) {
-        if(editStreams == null || 
-           editStreams.isEmpty() || nextIndex >= editStreams.size())
-          return false;
-        while(nextIndex < editStreams.size()
-              && !editStreams.get(nextIndex).getType().isOfType(type))
-          nextIndex++;
-        return nextIndex < editStreams.size();
-      }
-    }
-
-    public EditLogOutputStream next() {
-      EditLogOutputStream stream = null;
-      synchronized(FSEditLog.this) {
-        stream = editStreams.get(nextIndex);
-        prevIndex = nextIndex;
-        nextIndex++;
-        while(nextIndex < editStreams.size()
-            && !editStreams.get(nextIndex).getType().isOfType(type))
-        nextIndex++;
-      }
-      return stream;
-    }
-
-    public void remove() {
-      nextIndex = prevIndex; // restore previous state
-      removeStream(prevIndex); // remove last returned element
-      hasNext(); // reset nextIndex to correct place
-    }
-
-    void replace(EditLogOutputStream newStream) {
-      synchronized (FSEditLog.this) {
-        assert 0 <= prevIndex && prevIndex < editStreams.size() :
-                                                          "Index out of bound.";
-        editStreams.set(prevIndex, newStream);
-      }
-    }
-  }
+    
+    mapJournalsAndReportErrors(new JournalManagerClosure() {
+      @Override
+      public void apply(JournalManager jm) throws IOException {
+        jm.getCurrentStream().write(data, 0, length);        
+      }      
+    }, "Logging edit");
 
-  /**
-   * Get stream iterator for the specified type.
-   */
-  public Iterator<EditLogOutputStream>
-  getOutputStreamIterator(JournalType streamType) {
-    return new EditStreamIterator(streamType);
+    endTransaction(start);
   }
 
-  private void closeStream(EditLogOutputStream eStream) throws IOException {
-    eStream.setReadyToFlush();
-    eStream.flush();
-    eStream.close();
-  }
 
   void incrementCheckpointTime() {
     storage.incrementCheckpointTime();
@@ -1151,6 +942,7 @@ public class FSEditLog implements NNStor
   }
 
   synchronized void releaseBackupStream(NamenodeRegistration registration) {
+    /*
     Iterator<EditLogOutputStream> it =
                                   getOutputStreamIterator(JournalType.BACKUP);
     ArrayList<EditLogOutputStream> errorStreams = null;
@@ -1167,11 +959,14 @@ public class FSEditLog implements NNStor
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnStreams(errorStreams);
+    disableAndReportErrorOnJournals(errorStreams);
+    TODO BN currently disabled
+    */
   }
 
   synchronized boolean checkBackupRegistration(
       NamenodeRegistration registration) {
+    /*
     Iterator<EditLogOutputStream> it =
                                   getOutputStreamIterator(JournalType.BACKUP);
     boolean regAllowed = !it.hasNext();
@@ -1194,8 +989,12 @@ public class FSEditLog implements NNStor
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnStreams(errorStreams);
+    disableAndReportErrorOnJournals(errorStreams);
     return regAllowed;
+    
+    TODO BN currently disabled
+    */
+    return false;
   }
   
   static BytesWritable toBytesWritable(Options.Rename... options) {
@@ -1205,40 +1004,81 @@ public class FSEditLog implements NNStor
     }
     return new BytesWritable(bytes);
   }
+  
+  //// Iteration across journals
+  private interface JournalManagerClosure {
+    public void apply(JournalManager jm) throws IOException;
+  }
 
   /**
-   * Get the StorageDirectory for a stream
-   * @param es Stream whose StorageDirectory we wish to know
-   * @return the matching StorageDirectory
+   * Apply the given function across all of the journal managers, disabling
+   * any for which the closure throws an IOException.
+   * @param status message used for logging errors (e.g. "opening journal")
    */
-  StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
-    String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
-
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      FSNamesystem.LOG.info("comparing: " + parentStorageDir 
-                            + " and " + sd.getRoot().getAbsolutePath()); 
-      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
-        return sd;
+  private void mapJournalsAndReportErrors(
+      JournalManagerClosure closure, String status) {
+    ArrayList<JournalManager> badJournals = Lists.newArrayList();
+    for (JournalManager j : journals) {
+      try {
+        closure.apply(j);
+      } catch (IOException ioe) {
+        LOG.error("Error " + status + " (journal " + j + ")", ioe);
+        badJournals.add(j);
+      }
     }
-    return null;
+    disableAndReportErrorOnJournals(badJournals);
   }
+  
+  /**
+   * Called when some journals experience an error in some operation.
+   * This propagates errors to the storage level.
+   */
+  void disableAndReportErrorOnJournals(List<JournalManager> badJournals) {
+    if (badJournals == null || badJournals.isEmpty()) {
+      return; // nothing to do
+    }
 
-  private synchronized void disableStream(EditLogOutputStream stream) {
-    try { stream.close(); } catch (IOException e) {
-      // nothing to do.
-      LOG.warn("Failed to close eStream " + stream.getName()
-               + " before removing it (might be ok)");
+    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
+    for (JournalManager j : badJournals) {
+      LOG.error("Disabling journal " + j);
+      StorageDirectory sd = j.getStorageDirectory();
+      if (sd != null) {
+        errorDirs.add(sd);
+        // We will report this error to storage, which will propagate back
+        // to our listener interface, at which point we'll mark it faulty.
+      } else {
+        // Just mark it faulty ourselves, since it's not associated with a
+        // storage directory.
+        markJournalFaulty(j);
+      }
     }
-    editStreams.remove(stream);
+    
+    try {
+      storage.reportErrorsOnDirectories(errorDirs);
+    } catch (IOException ioe) {
+      LOG.error("Problem reporting error on directories ", ioe);
+    }
+  }
+ 
+  private synchronized void markJournalFaulty(JournalManager journal) {
+    try {
+      journal.abort();
+    } catch (IOException e) {
+      LOG.warn("Failed to abort faulty journal " + journal
+          + " before removing it (might be OK)", e);
+    }
+    journals.remove(journal);
+    faultyJournals.add(journal);
 
-    if (editStreams.size() <= 0) {
-      String msg = "Fatal Error: All storage directories are inaccessible.";
+    if (journals.isEmpty()) {
+      String msg = "Fatal Error: All journals are inaccessible.";
       LOG.fatal(msg, new IOException(msg));
       Runtime.getRuntime().exit(-1);
     }
   }
 
+
+
   /**
    * Error Handling on a storageDirectory
    *
@@ -1247,22 +1087,18 @@ public class FSEditLog implements NNStor
   @Override // NNStorageListener
   public synchronized void errorOccurred(StorageDirectory sd)
       throws IOException {
-    ArrayList<EditLogOutputStream> errorStreams
-      = new ArrayList<EditLogOutputStream>();
-
-    for (EditLogOutputStream eStream : editStreams) {
-      LOG.error("Unable to log edits to " + eStream.getName()
-                + "; removing it");
-
-      StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
-      if (sd == streamStorageDir) {
-        errorStreams.add(eStream);
+    
+    LOG.debug("Error occurred on " + sd);
+    
+    for (JournalManager jm : journals) {
+      if (jm.getStorageDirectory() == sd) {
+        LOG.warn("Marking corresponding journal " + jm + " faulty");
+        markJournalFaulty(jm);
+        return;
       }
     }
-
-    for (EditLogOutputStream eStream : errorStreams) {
-      disableStream(eStream);
-    }
+    
+    LOG.debug("Faulty " + sd + " did not correspond to any live journal manager.");
   }
 
   @Override // NNStorageListener
@@ -1276,9 +1112,26 @@ public class FSEditLog implements NNStor
   @Override // NNStorageListener
   public synchronized void directoryAvailable(StorageDirectory sd)
       throws IOException {
-    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-      File eFile = getEditFile(sd);
-      addNewEditLogStream(eFile);
+    for (JournalManager jm : journals) {
+      assert (jm.getStorageDirectory() != sd) :
+        "Storage directory " + sd + " being restored but wasn't marked faulty";
+    }
+
+    for (Iterator<JournalManager> iter = faultyJournals.iterator();
+         iter.hasNext();) {
+      JournalManager jm = iter.next();
+      if (jm.getStorageDirectory() == sd) {
+        try {
+          jm.restore();
+          iter.remove();
+          journals.add(jm);
+        } catch (IOException ioe) {
+          // TODO make sure this code is covered by unit tests!
+          LOG.error("Unable to restore storage directory " + sd, ioe);
+          storage.reportErrorsOnDirectory(sd);
+        }
+        break;
+      }
     }
   }
 }

Added: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1089737&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
(added)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
Thu Apr  7 04:46:32 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+/**
+ * Journal manager for the common case of edits files being written
+ * to a storage directory.
+ */
+public class FileJournalManager implements JournalManager {
+
+  private final StorageDirectory sd;
+  private EditLogFileOutputStream currentStream;
+  private int sizeOutputFlushBuffer = 512*1024;
+
+  public FileJournalManager(StorageDirectory sd) {
+    this.sd = sd;
+  }
+  
+  @Override
+  public void open() throws IOException {
+    assert currentStream == null;
+    File eFile = NNStorage.getEditFile(sd);
+    
+    currentStream = new EditLogFileOutputStream(
+        eFile, sizeOutputFlushBuffer);    
+  }
+
+  @Override
+  public void abort() throws IOException {
+    if (currentStream != null) {
+      currentStream.close();
+      currentStream = null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    currentStream.setReadyToFlush();
+    currentStream.flush();
+    currentStream.close();
+    currentStream = null;
+  }
+  
+  @Override
+  public void restore() throws IOException {
+    assert currentStream == null :
+      "Should have been aborted before restoring!" +
+      "Current stream: " + currentStream;
+    open();
+  }
+  
+
+  @Override
+  public void divertFileStreams(String dest) throws IOException {
+    // close old stream
+    close();
+    // create new stream
+    currentStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
+        sizeOutputFlushBuffer);
+    currentStream.create();    
+  }
+
+  @Override
+  public void revertFileStreams(String source) throws IOException {
+    // close old stream
+    close();
+    
+    // rename edits.new to edits
+    File editFile = NNStorage.getEditFile(sd);
+    File prevEditFile = new File(sd.getRoot(), source);
+    if(prevEditFile.exists()) {
+      if(!prevEditFile.renameTo(editFile)) {
+        //
+        // renameTo() fails on Windows if the destination
+        // file exists.
+        //
+        if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
+          throw new IOException("Rename failed for " + sd.getRoot());
+        }
+      }
+    }
+    // open new stream
+    currentStream = new EditLogFileOutputStream(editFile, sizeOutputFlushBuffer);
+  }
+
+  @Override
+  public StorageDirectory getStorageDirectory() {
+    return sd;
+  }
+
+  @Override
+  public EditLogOutputStream getCurrentStream() {
+    return currentStream;
+  }
+
+  @Override
+  public void setBufferCapacity(int size) {
+    this.sizeOutputFlushBuffer = size;
+  }
+
+  void setCurrentStreamForTests(EditLogFileOutputStream injectedStream) {
+    this.currentStream = injectedStream;
+  }
+}

Added: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1089737&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
(added)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
Thu Apr  7 04:46:32 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+/**
+ * A JournalManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface JournalManager {
+
+  /**
+   * @return the StorageDirectory associated with this journal,
+   * or null if this is not a disk-based journal 
+   */
+  StorageDirectory getStorageDirectory();
+
+  /**
+   * @return the current stream to which to direct edits
+   */
+  EditLogOutputStream getCurrentStream();
+
+  /**
+   * Prepare the stream to write edits
+   */
+  void open() throws IOException;
+  
+  /**
+   * Close the currently open stream cleanly
+   */
+  void close() throws IOException;
+  
+  /**
+   * Abort the currently open stream in the case that the storage has
+   * noticed that the directory is no longer valid.
+   */
+  void abort() throws IOException;
+  
+  /**
+   * Re-open the stream when the underlying storage directory has been
+   * deemed recovered.
+   */
+  void restore() throws IOException;  
+
+  /**
+   * Divert streams, eg to "edits.new". This will go away with 1073 TODO
+   */
+  void divertFileStreams(String dest) throws IOException;
+
+  /**
+   * Revert streams, eg by renaming edits.new back to "edits". This will go
+   * away with HDFS-1073
+   */
+  void revertFileStreams(String source) throws IOException;
+
+  /**
+   * Set the amount of memory that this stream should use to buffer edits
+   */
+  void setBufferCapacity(int size);
+}

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1089737&r1=1089736&r2=1089737&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
Thu Apr  7 04:46:32 2011
@@ -754,14 +754,14 @@ public class NNStorage extends Storage i
   /**
    * @return A editlog File in storage directory 'sd'.
    */
-  File getEditFile(StorageDirectory sd) {
+  static File getEditFile(StorageDirectory sd) {
     return getStorageFile(sd, NameNodeFile.EDITS);
   }
 
   /**
    * @return A temporary editlog File in storage directory 'sd'.
    */
-  File getEditNewFile(StorageDirectory sd) {
+  static File getEditNewFile(StorageDirectory sd) {
     return getStorageFile(sd, NameNodeFile.EDITS_NEW);
   }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1089737&r1=1089736&r2=1089737&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
Thu Apr  7 04:46:32 2011
@@ -341,9 +341,11 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      ArrayList<EditLogOutputStream> streams = editLog.getEditStreams();
-      EditLogOutputStream spyElos = spy(streams.get(0));
-      streams.set(0, spyElos);
+      FileJournalManager journal = (FileJournalManager)
+          editLog.getJournals().get(0);
+      EditLogFileOutputStream spyElos =
+          spy((EditLogFileOutputStream)journal.getCurrentStream());
+      journal.setCurrentStreamForTests(spyElos);
 
       final AtomicReference<Throwable> deferredException =
           new AtomicReference<Throwable>();



Mime
View raw message