hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1297856 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/namenode/
Date Wed, 07 Mar 2012 06:15:36 GMT
Author: suresh
Date: Wed Mar  7 06:15:35 2012
New Revision: 1297856

URL: http://svn.apache.org/viewvc?rev=1297856&view=rev
Log:
HDFS-2158. Merging change r1177473 from trunk to 0.23

Added:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
      - copied unchanged from r1177473, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Mar
 7 06:15:35 2012
@@ -98,6 +98,8 @@ Release 0.23.3 - UNRELEASED
     HDFS-3030. Remove getProtocolVersion and getProtocolSignature from 
     translators. (jitendra)
 
+    HDFS-2158. Add JournalSet to manage the set of journals. (jitendra)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
Wed Mar  7 06:15:35 2012
@@ -54,7 +54,6 @@ class EditLogBackupOutputStream extends 
     this.nnRegistration = nnReg;
     InetSocketAddress bnAddress =
       NetUtils.createSocketAddr(bnRegistration.getAddress());
-    Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
           new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration());
@@ -66,16 +65,6 @@ class EditLogBackupOutputStream extends 
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
   
-  @Override // JournalStream
-  public String getName() {
-    return bnRegistration.getAddress();
-  }
-
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.BACKUP;
-  }
-
   @Override // EditLogOutputStream
   void write(FSEditLogOp op) throws IOException {
     doubleBuf.writeOp(op);
@@ -141,16 +130,6 @@ class EditLogBackupOutputStream extends 
   }
 
   /**
-   * There is no persistent storage. Therefore length is 0.<p>
-   * Length is used to check when it is large enough to start a checkpoint.
-   * This criteria should not be used for backup streams.
-   */
-  @Override // EditLogOutputStream
-  long length() throws IOException {
-    return 0;
-  }
-
-  /**
    * Get backup node registration.
    */
   NamenodeRegistration getRegistration() {

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
Wed Mar  7 06:15:35 2012
@@ -37,9 +37,7 @@ import com.google.common.annotations.Vis
  * stores edits in a local file.
  */
 class EditLogFileOutputStream extends EditLogOutputStream {
-  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
-
-  private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
+  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
 
   private File file;
   private FileOutputStream fp; // file stream for storing edit logs
@@ -73,16 +71,6 @@ class EditLogFileOutputStream extends Ed
     fc.position(fc.size());
   }
 
-  @Override // JournalStream
-  public String getName() {
-    return file.getPath();
-  }
-
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.FILE;
-  }
-
   @Override
   void write(FSEditLogOp op) throws IOException {
     doubleBuf.writeOp(op);
@@ -175,7 +163,10 @@ class EditLogFileOutputStream extends Ed
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
-    
+    if (doubleBuf.isFlushed()) {
+      LOG.info("Nothing to flush");
+      return;
+    }
     preallocate(); // preallocate file if necessary
     doubleBuf.flushTo(fp);
     fc.force(false); // metadata updates not needed because of preallocation
@@ -189,16 +180,6 @@ class EditLogFileOutputStream extends Ed
   public boolean shouldForceSync() {
     return doubleBuf.shouldForceSync();
   }
-  
-  /**
-   * Return the size of the current edit log including buffered data.
-   */
-  @Override
-  long length() throws IOException {
-    // file size - header size + size of both buffers
-    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + 
-      doubleBuf.countBufferedBytes();
-  }
 
   // allocate a big chunk of data
   private void preallocate() throws IOException {

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
Wed Mar  7 06:15:35 2012
@@ -18,18 +18,20 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
+
 /**
  * A generic abstract class to support journaling of edits logs into 
  * a persistent storage.
  */
-abstract class EditLogOutputStream implements JournalStream {
+abstract class EditLogOutputStream {
   // these are statistics counters
   private long numSync;        // number of sync(s) to disk
   private long totalTimeSync;  // total time to sync
 
-  EditLogOutputStream() throws IOException {
+  EditLogOutputStream() {
     numSync = totalTimeSync = 0;
   }
 
@@ -101,12 +103,6 @@ abstract class EditLogOutputStream imple
   }
 
   /**
-   * Return the size of the current edits log.
-   * Length is used to check when it is large enough to start a checkpoint.
-   */
-  abstract long length() throws IOException;
-
-  /**
    * Implement the policy when to automatically sync the buffered edits log
    * The buffered edits can be flushed when the buffer becomes full or
    * a certain period of time is elapsed.
@@ -127,12 +123,7 @@ abstract class EditLogOutputStream imple
   /**
    * Return number of calls to {@link #flushAndSync()}
    */
-  long getNumSync() {
+  protected long getNumSync() {
     return numSync;
   }
-
-  @Override // Object
-  public String toString() {
-    return getName();
-  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Wed Mar  7 06:15:35 2012
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,25 +34,17 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -62,9 +54,6 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceStability.Evolving
 public class FSEditLog  {
 
-  static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
-      " File system changes are not persistent. No journal streams.";
-
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
   /**
@@ -82,10 +71,11 @@ public class FSEditLog  {
     CLOSED;
   }  
   private State state = State.UNINITIALIZED;
+  
+  //initialize
+  final private JournalSet journalSet;
+  private EditLogOutputStream editLogStream = null;
 
-
-  private List<JournalAndStream> journals = Lists.newArrayList();
-    
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
 
@@ -137,15 +127,15 @@ public class FSEditLog  {
     this.storage = storage;
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
-    
+
+    this.journalSet = new JournalSet();
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
-      journals.add(new JournalAndStream(new FileJournalManager(sd)));
+      journalSet.add(new FileJournalManager(sd));
     }
     
-    if (journals.isEmpty()) {
+    if (journalSet.isEmpty()) {
       LOG.error("No edits directories configured!");
-    }
-    
+    } 
     state = State.BETWEEN_LOG_SEGMENTS;
   }
   
@@ -172,9 +162,8 @@ public class FSEditLog  {
       LOG.debug("Closing log when already closed");
       return;
     }
-    
     if (state == State.IN_SEGMENT) {
-      assert !journals.isEmpty();
+      assert editLogStream != null;
       waitForSyncToFinish();
       endCurrentLogSegment(true);
     }
@@ -193,20 +182,14 @@ public class FSEditLog  {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if (journals.isEmpty()) {
-        throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      }
-      
       long start = beginTransaction();
       op.setTransactionId(txid);
 
-      mapJournalsAndReportErrors(new JournalClosure() {
-        @Override 
-        public void apply(JournalAndStream jas) throws IOException {
-          if (!jas.isActive()) return;
-          jas.stream.write(op);
-        }
-      }, "logging edit");
+      try {
+        editLogStream.write(op);
+      } catch (IOException ex) {
+        // All journals failed, it is handled in logSync.
+      }
 
       endTransaction(start);
       
@@ -251,14 +234,7 @@ public class FSEditLog  {
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-
-      if (jas.getCurrentStream().shouldForceSync()) {
-        return true;
-      }
-    }
-    return false;
+    return editLogStream.shouldForceSync();
   }
   
   private long beginTransaction() {
@@ -322,7 +298,7 @@ public class FSEditLog  {
    * NOTE: this should be done while holding the FSNamesystem lock, or
    * else more operations can start writing while this is in progress.
    */
-  void logSyncAll() throws IOException {
+  void logSyncAll() {
     // Record the most recent transaction ID as our own id
     synchronized (this) {
       TransactionId id = myTransactionId.get();
@@ -366,74 +342,73 @@ public class FSEditLog  {
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
     
-    List<JournalAndStream> candidateJournals =
-      Lists.newArrayListWithCapacity(journals.size());
-    List<JournalAndStream> badJournals = Lists.newArrayList();
-    
     boolean sync = false;
     try {
+      EditLogOutputStream logStream = null;
       synchronized (this) {
         try {
-        printStatistics(false);
-  
-        // if somebody is already syncing, then wait
-        while (mytxid > synctxid && isSyncRunning) {
-          try {
-            wait(1000);
-          } catch (InterruptedException ie) { 
+          printStatistics(false);
+
+          // if somebody is already syncing, then wait
+          while (mytxid > synctxid && isSyncRunning) {
+            try {
+              wait(1000);
+            } catch (InterruptedException ie) {
+            }
           }
-        }
   
-        //
-        // If this transaction was already flushed, then nothing to do
-        //
-        if (mytxid <= synctxid) {
-          numTransactionsBatchedInSync++;
-          if (metrics != null) // Metrics is non-null only when used inside name node
-            metrics.incrTransactionsBatchedInSync();
-          return;
-        }
+          //
+          // If this transaction was already flushed, then nothing to do
+          //
+          if (mytxid <= synctxid) {
+            numTransactionsBatchedInSync++;
+            if (metrics != null) {
+              // Metrics is non-null only when used inside name node
+              metrics.incrTransactionsBatchedInSync();
+            }
+            return;
+          }
      
-        // now, this thread will do the sync
-        syncStart = txid;
-        isSyncRunning = true;
-        sync = true;
-  
-        // swap buffers
-        assert !journals.isEmpty() : "no editlog streams";
-        
-        for (JournalAndStream jas : journals) {
-          if (!jas.isActive()) continue;
+          // now, this thread will do the sync
+          syncStart = txid;
+          isSyncRunning = true;
+          sync = true;
+  
+          // swap buffers
           try {
-            jas.getCurrentStream().setReadyToFlush();
-            candidateJournals.add(jas);
-          } catch (IOException ie) {
-            LOG.error("Unable to get ready to flush.", ie);
-            badJournals.add(jas);
+            if (journalSet.isEmpty()) {
+              throw new IOException("No journals available to flush");
+            }
+            editLogStream.setReadyToFlush();
+          } catch (IOException e) {
+            LOG.fatal("Could not sync any journal to persistent storage. "
+                + "Unsynced transactions: " + (txid - synctxid),
+                new Exception());
+            runtime.exit(1);
           }
-        }
         } finally {
           // Prevent RuntimeException from blocking other log edit write 
           doneWithAutoSyncScheduling();
         }
+        //editLogStream may become null,
+        //so store a local variable for flush.
+        logStream = editLogStream;
       }
-  
+      
       // do the sync
       long start = now();
-      for (JournalAndStream jas : candidateJournals) {
-        if (!jas.isActive()) continue;
-        try {
-          jas.getCurrentStream().flush();
-        } catch (IOException ie) {
-          LOG.error("Unable to sync edit log.", ie);
-          //
-          // remember the streams that encountered an error.
-          //
-          badJournals.add(jas);
+      try {
+        if (logStream != null) {
+          logStream.flush();
+        }
+      } catch (IOException ex) {
+        synchronized (this) {
+          LOG.fatal("Could not sync any journal to persistent storage. "
+              + "Unsynced transactions: " + (txid - synctxid), new Exception());
+          runtime.exit(1);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnJournals(badJournals);
   
       if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
@@ -443,13 +418,6 @@ public class FSEditLog  {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
         if (sync) {
-          if (badJournals.size() >= journals.size()) {
-            LOG.fatal("Could not sync any journal to persistent storage. " +
-                "Unsynced transactions: " + (txid - synctxid),
-                new Exception());
-            runtime.exit(1);
-          }
-
           synctxid = syncStart;
           isSyncRunning = false;
         }
@@ -466,9 +434,6 @@ public class FSEditLog  {
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (journals.isEmpty()) {
-      return;
-    }
     lastPrintTime = now;
     StringBuilder buf = new StringBuilder();
     buf.append("Number of transactions: ");
@@ -478,20 +443,9 @@ public class FSEditLog  {
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      buf.append(jas.getCurrentStream().getNumSync());
-      break;
-    }
-
+    buf.append(editLogStream.getNumSync());
     buf.append(" SyncTimes(ms): ");
-
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      EditLogOutputStream eStream = jas.getCurrentStream();
-      buf.append(eStream.getTotalSyncTime());
-      buf.append(" ");
-    }
+    buf.append(journalSet.getSyncTimes());
     LOG.info(buf);
   }
 
@@ -664,7 +618,6 @@ public class FSEditLog  {
    * log delegation token to edit log
    * @param id DelegationTokenIdentifier
    * @param expiryTime of the token
-   * @return
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
@@ -703,24 +656,11 @@ public class FSEditLog  {
   }
   
   /**
-   * @return the number of active (non-failed) journals
-   */
-  private int countActiveJournals() {
-    int count = 0;
-    for (JournalAndStream jas : journals) {
-      if (jas.isActive()) {
-        count++;
-      }
-    }
-    return count;
-  }
-  
-  /**
    * Used only by unit tests.
    */
   @VisibleForTesting
   List<JournalAndStream> getJournals() {
-    return journals;
+    return journalSet.getAllJournalStreams();
   }
   
   /**
@@ -742,62 +682,9 @@ public class FSEditLog  {
   /**
    * Return a manifest of what finalized edit logs are available
    */
-  public synchronized RemoteEditLogManifest getEditLogManifest(
-      long fromTxId) throws IOException {
-    // Collect RemoteEditLogs available from each FileJournalManager
-    List<RemoteEditLog> allLogs = Lists.newArrayList();
-    for (JournalAndStream j : journals) {
-      if (j.getManager() instanceof FileJournalManager) {
-        FileJournalManager fjm = (FileJournalManager)j.getManager();
-        try {
-          allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
-        } catch (Throwable t) {
-          LOG.warn("Cannot list edit logs in " + fjm, t);
-        }
-      }
-    }
-    
-    // Group logs by their starting txid
-    ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
-      Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
-    long curStartTxId = fromTxId;
-
-    List<RemoteEditLog> logs = Lists.newArrayList();
-    while (true) {
-      ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
-      if (logGroup.isEmpty()) {
-        // we have a gap in logs - for example because we recovered some old
-        // storage directory with ancient logs. Clear out any logs we've
-        // accumulated so far, and then skip to the next segment of logs
-        // after the gap.
-        SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
-        startTxIds = startTxIds.tailSet(curStartTxId);
-        if (startTxIds.isEmpty()) {
-          break;
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found gap in logs at " + curStartTxId + ": " +
-                "not returning previous logs in manifest.");
-          }
-          logs.clear();
-          curStartTxId = startTxIds.first();
-          continue;
-        }
-      }
-
-      // Find the one that extends the farthest forward
-      RemoteEditLog bestLog = Collections.max(logGroup);
-      logs.add(bestLog);
-      // And then start looking from after that point
-      curStartTxId = bestLog.getEndTxId() + 1;
-    }
-    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated manifest for logs since " + fromTxId + ":"
-          + ret);      
-    }
-    return ret;
+  public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
+      throws IOException {
+    return journalSet.getEditLogManifest(fromTxId);
   }
  
   /**
@@ -840,14 +727,9 @@ public class FSEditLog  {
     // See HDFS-2174.
     storage.attemptRestoreRemovedStorage();
     
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.startLogSegment(segmentTxId);
-      }
-    }, "starting log segment " + segmentTxId);
-
-    if (countActiveJournals() == 0) {
+    try {
+      editLogStream = journalSet.startLogSegment(segmentTxId);
+    } catch (IOException ex) {
       throw new IOException("Unable to start log segment " +
           segmentTxId + ": no journals successfully started.");
     }
@@ -881,14 +763,12 @@ public class FSEditLog  {
     
     final long lastTxId = getLastWrittenTxId();
     
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        if (jas.isActive()) {
-          jas.close(lastTxId);
-        }
-      }
-    }, "ending log segment");
+    try {
+      journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
+      editLogStream = null;
+    } catch (IOException e) {
+      //All journals have failed, it will be handled in logSync.
+    }
     
     state = State.BETWEEN_LOG_SEGMENTS;
   }
@@ -897,14 +777,15 @@ public class FSEditLog  {
    * Abort all current logs. Called from the backup node.
    */
   synchronized void abortCurrentLogSegment() {
-    mapJournalsAndReportErrors(new JournalClosure() {
-      
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.abort();
+    try {
+      //Check for null, as abort can be called any time.
+      if (editLogStream != null) {
+        editLogStream.abort();
+        editLogStream = null;
       }
-    }, "aborting all streams");
-    state = State.BETWEEN_LOG_SEGMENTS;
+    } catch (IOException e) {
+      LOG.warn("All journals failed to abort", e);
+    }
   }
 
   /**
@@ -920,13 +801,12 @@ public class FSEditLog  {
         "cannot purge logs older than txid " + minTxIdToKeep +
         " when current segment starts at " + curSegmentTxId;
     }
-    
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.manager.purgeLogsOlderThan(minTxIdToKeep);
-      }
-    }, "purging logs older than " + minTxIdToKeep);
+
+    try {
+      journalSet.purgeLogsOlderThan(minTxIdToKeep);
+    } catch (IOException ex) {
+      //All journals have failed, it will be handled in logSync.
+    }
   }
 
   
@@ -954,9 +834,7 @@ public class FSEditLog  {
 
   // sets the initial capacity of the flush buffer.
   public void setOutputBufferCapacity(int size) {
-    for (JournalAndStream jas : journals) {
-      jas.manager.setOutputBufferCapacity(size);
-    }
+      journalSet.setOutputBufferCapacity(size);
   }
 
   /**
@@ -977,7 +855,7 @@ public class FSEditLog  {
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
     
-    JournalAndStream jas = findBackupJournalAndStream(bnReg);
+    JournalManager jas = findBackupJournal(bnReg);
     if (jas != null) {
       // already registered
       LOG.info("Backup node " + bnReg + " re-registers");
@@ -986,35 +864,29 @@ public class FSEditLog  {
     
     LOG.info("Registering new backup node: " + bnReg);
     BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
-    journals.add(new JournalAndStream(bjm));
+    journalSet.add(bjm);
   }
   
-  synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    for (Iterator<JournalAndStream> iter = journals.iterator();
-         iter.hasNext();) {
-      JournalAndStream jas = iter.next();
-      if (jas.manager instanceof BackupJournalManager &&
-          ((BackupJournalManager)jas.manager).matchesRegistration(
-              registration)) {
-        jas.abort();        
-        LOG.info("Removing backup journal " + jas);
-        iter.remove();
-      }
+  synchronized void releaseBackupStream(NamenodeRegistration registration)
+      throws IOException {
+    BackupJournalManager bjm = this.findBackupJournal(registration);
+    if (bjm != null) {
+      LOG.info("Removing backup journal " + bjm);
+      journalSet.remove(bjm);
     }
   }
   
   /**
    * Find the JournalAndStream associated with this BackupNode.
+   * 
    * @return null if it cannot be found
    */
-  private synchronized JournalAndStream findBackupJournalAndStream(
+  private synchronized BackupJournalManager findBackupJournal(
       NamenodeRegistration bnReg) {
-    for (JournalAndStream jas : journals) {
-      if (jas.manager instanceof BackupJournalManager) {
-        BackupJournalManager bjm = (BackupJournalManager)jas.manager;
-        if (bjm.matchesRegistration(bnReg)) {
-          return jas;
-        }
+    for (JournalManager bjm : journalSet.getJournalManagers()) {
+      if ((bjm instanceof BackupJournalManager)
+          && ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
+        return (BackupJournalManager) bjm;
       }
     }
     return null;
@@ -1026,124 +898,24 @@ public class FSEditLog  {
    */   
   synchronized void logEdit(final int length, final byte[] data) {
     long start = beginTransaction();
-    
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        if (jas.isActive()) {
-          jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
-        }
-      }      
-    }, "Logging edit");
-
-    endTransaction(start);
-  }
 
-  //// Iteration across journals
-  private interface JournalClosure {
-    public void apply(JournalAndStream jas) throws IOException;
-  }
-
-  /**
-   * Apply the given function across all of the journal managers, disabling
-   * any for which the closure throws an IOException.
-   * @param status message used for logging errors (e.g. "opening journal")
-   */
-  private void mapJournalsAndReportErrors(
-      JournalClosure closure, String status) {
-    List<JournalAndStream> badJAS = Lists.newLinkedList();
-    for (JournalAndStream jas : journals) {
-      try {
-        closure.apply(jas);
-      } catch (Throwable t) {
-        LOG.error("Error " + status + " (journal " + jas + ")", t);
-        badJAS.add(jas);
-      }
-    }
-
-    disableAndReportErrorOnJournals(badJAS);
-  }
-  
-  /**
-   * Called when some journals experience an error in some operation.
-   * This propagates errors to the storage level.
-   */
-  private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals)
{
-    if (badJournals == null || badJournals.isEmpty()) {
-      return; // nothing to do
-    }
- 
-    for (JournalAndStream j : badJournals) {
-      LOG.error("Disabling journal " + j);
-      j.abort();
-    }
-  }
-
-  /**
-   * Find the best editlog input stream to read from txid. In this case
-   * best means the editlog which has the largest continuous range of 
-   * transactions starting from the transaction id, fromTxId.
-   *
-   * If a journal throws an CorruptionException while reading from a txn id,
-   * it means that it has more transactions, but can't find any from fromTxId. 
-   * If this is the case and no other journal has transactions, we should throw
-   * an exception as it means more transactions exist, we just can't load them.
-   *
-   * @param fromTxId Transaction id to start from.
-   * @return a edit log input stream with tranactions fromTxId 
-   *         or null if no more exist
-   */
-  private EditLogInputStream selectStream(long fromTxId) 
-      throws IOException {
-    JournalManager bestjm = null;
-    long bestjmNumTxns = 0;
-    CorruptionException corruption = null;
-
-    for (JournalAndStream jas : journals) {
-      JournalManager candidate = jas.getManager();
-      long candidateNumTxns = 0;
-      try {
-        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
-      } catch (CorruptionException ce) {
-        corruption = ce;
-      } catch (IOException ioe) {
-        LOG.warn("Error reading number of transactions from " + candidate);
-        continue; // error reading disk, just skip
-      }
-      
-      if (candidateNumTxns > bestjmNumTxns) {
-        bestjm = candidate;
-        bestjmNumTxns = candidateNumTxns;
-      }
-    }
-    
-    
-    if (bestjm == null) {
-      /**
-       * If all candidates either threw a CorruptionException or
-       * found 0 transactions, then a gap exists. 
-       */
-      if (corruption != null) {
-        throw new IOException("Gap exists in logs from " 
-                              + fromTxId, corruption);
-      } else {
-        return null;
-      }
+    try {
+      editLogStream.writeRaw(data, 0, length);
+    } catch (IOException ex) {
+      // All journals have failed, it will be handled in logSync.
     }
-
-    return bestjm.getInputStream(fromTxId);
+    endTransaction(start);
   }
 
   /**
    * Run recovery on all journals to recover any unclosed segments
    */
   void recoverUnclosedStreams() {
-    mapJournalsAndReportErrors(new JournalClosure() {
-        @Override
-        public void apply(JournalAndStream jas) throws IOException {
-          jas.manager.recoverUnfinalizedSegments();
-        }
-      }, "recovering unclosed streams");
+    try {
+      journalSet.recoverUnfinalizedSegments();
+    } catch (IOException ex) {
+      // All journals have failed, it is handled in logSync.
+    }
   }
 
   /**
@@ -1151,23 +923,16 @@ public class FSEditLog  {
    * @param fromTxId first transaction in the selected streams
    * @param toAtLeast the selected streams must contain this transaction
    */
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId)

-      throws IOException {
-    List<EditLogInputStream> streams = Lists.newArrayList();
-    
-    boolean gapFound = false;
-    EditLogInputStream stream = selectStream(fromTxId);
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId) throws IOException {
+    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+    EditLogInputStream stream = journalSet.getInputStream(fromTxId);
     while (stream != null) {
       fromTxId = stream.getLastTxId() + 1;
       streams.add(stream);
-      try {
-        stream = selectStream(fromTxId);
-      } catch (IOException ioe) {
-        gapFound = true;
-        break;
-      }
+      stream = journalSet.getInputStream(fromTxId);
     }
-    if (fromTxId <= toAtLeastTxId || gapFound) {
+    if (fromTxId <= toAtLeastTxId) {
       closeAllStreams(streams);
       throw new IOException("No non-corrupt logs for txid " 
                             + fromTxId);
@@ -1184,75 +949,4 @@ public class FSEditLog  {
       IOUtils.closeStream(s);
     }
   }
-
-  /**
-   * Container for a JournalManager paired with its currently
-   * active stream.
-   * 
-   * If a Journal gets disabled due to an error writing to its
-   * stream, then the stream will be aborted and set to null.
-   */
-  static class JournalAndStream {
-    private final JournalManager manager;
-    private EditLogOutputStream stream;
-    private long segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-    
-    private JournalAndStream(JournalManager manager) {
-      this.manager = manager;
-    }
-
-    private void startLogSegment(long txId) throws IOException {
-      Preconditions.checkState(stream == null);
-      stream = manager.startLogSegment(txId);
-      segmentStartsAtTxId = txId;
-    }
-
-    private void close(long lastTxId) throws IOException {
-      Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
-          "invalid segment: lastTxId %s >= " +
-          "segment starting txid %s", lastTxId, segmentStartsAtTxId);
-          
-      if (stream == null) return;
-      stream.close();
-      manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
-      stream = null;
-    }
-    
-    @VisibleForTesting
-    void abort() {
-      if (stream == null) return;
-      try {
-        stream.abort();
-      } catch (IOException ioe) {
-        LOG.error("Unable to abort stream " + stream, ioe);
-      }
-      stream = null;
-      segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-    }
-
-    private boolean isActive() {
-      return stream != null;
-    }
-
-    @VisibleForTesting
-    EditLogOutputStream getCurrentStream() {
-      return stream;
-    }
-    
-    @Override
-    public String toString() {
-      return "JournalAndStream(mgr=" + manager +
-        ", " + "stream=" + stream + ")";
-    }
-
-    @VisibleForTesting
-    void setCurrentStreamForTests(EditLogOutputStream stream) {
-      this.stream = stream;
-    }
-    
-    @VisibleForTesting
-    JournalManager getManager() {
-      return manager;
-    }
-  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
Wed Mar  7 06:15:35 2012
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Comparator;
 import java.util.Collections;
 import java.util.regex.Matcher;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
Wed Mar  7 06:15:35 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 
+
 /**
  * A JournalManager is responsible for managing a single place of storing
  * edit logs. It may correspond to multiple files, a backup node, etc.

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
Wed Mar  7 06:15:35 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,7 +75,7 @@ public class TestEditLogJournalFailures 
   public void testSingleFailedEditsDirOnFlush() throws IOException {
     assertTrue(doAnEdit());
     // Invalidate one edits journal.
-    invalidateEditsDirAtIndex(0, true);
+    invalidateEditsDirAtIndex(0, true, false);
     // Make sure runtime.exit(...) hasn't been called at all yet.
     assertExitInvocations(0);
     assertTrue(doAnEdit());
@@ -86,8 +88,22 @@ public class TestEditLogJournalFailures 
   public void testAllEditsDirsFailOnFlush() throws IOException {
     assertTrue(doAnEdit());
     // Invalidate both edits journals.
-    invalidateEditsDirAtIndex(0, true);
-    invalidateEditsDirAtIndex(1, true);
+    invalidateEditsDirAtIndex(0, true, false);
+    invalidateEditsDirAtIndex(1, true, false);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    assertTrue(doAnEdit());
+    // The previous edit could not be synced to any persistent storage, should
+    // have halted the NN.
+    assertExitInvocations(1);
+  }
+  
+  @Test
+  public void testAllEditsDirFailOnWrite() throws IOException {
+    assertTrue(doAnEdit());
+    // Invalidate both edits journals.
+    invalidateEditsDirAtIndex(0, true, true);
+    invalidateEditsDirAtIndex(1, true, true);
     // Make sure runtime.exit(...) hasn't been called at all yet.
     assertExitInvocations(0);
     assertTrue(doAnEdit());
@@ -100,7 +116,7 @@ public class TestEditLogJournalFailures 
   public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
     assertTrue(doAnEdit());
     // Invalidate one edits journal.
-    invalidateEditsDirAtIndex(0, false);
+    invalidateEditsDirAtIndex(0, false, false);
     // Make sure runtime.exit(...) hasn't been called at all yet.
     assertExitInvocations(0);
     assertTrue(doAnEdit());
@@ -117,16 +133,18 @@ public class TestEditLogJournalFailures 
    * @return the original <code>EditLogOutputStream</code> of the journal.
    */
   private EditLogOutputStream invalidateEditsDirAtIndex(int index,
-      boolean failOnFlush) throws IOException {
+      boolean failOnFlush, boolean failOnWrite) throws IOException {
     FSImage fsimage = cluster.getNamesystem().getFSImage();
     FSEditLog editLog = fsimage.getEditLog();
-    
 
-    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    JournalAndStream jas = editLog.getJournals().get(index);
     EditLogFileOutputStream elos =
       (EditLogFileOutputStream) jas.getCurrentStream();
     EditLogFileOutputStream spyElos = spy(elos);
-    
+    if (failOnWrite) {
+      doThrow(new IOException("fail on write()")).when(spyElos).write(
+          (FSEditLogOp) any());
+    }
     if (failOnFlush) {
       doThrow(new IOException("fail on flush()")).when(spyElos).flush();
     } else {
@@ -151,7 +169,7 @@ public class TestEditLogJournalFailures 
     FSImage fsimage = cluster.getNamesystem().getFSImage();
     FSEditLog editLog = fsimage.getEditLog();
 
-    FSEditLog.JournalAndStream jas = editLog.getJournals().get(index);
+    JournalAndStream jas = editLog.getJournals().get(index);
     jas.setCurrentStreamForTests(elos);
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
Wed Mar  7 06:15:35 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.log4j.Level;
 
@@ -356,7 +357,7 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      FSEditLog.JournalAndStream jas = editLog.getJournals().get(0);
+      JournalAndStream jas = editLog.getJournals().get(0);
       EditLogFileOutputStream spyElos =
           spy((EditLogFileOutputStream)jas.getCurrentStream());
       jas.setCurrentStreamForTests(spyElos);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java?rev=1297856&r1=1297855&r2=1297856&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
Wed Mar  7 06:15:35 2012
@@ -44,6 +44,8 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
+
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
@@ -120,7 +122,7 @@ public class TestStorageRestore {
     // simulate an error
     fi.getStorage().reportErrorsOnDirectories(al);
     
-    for (FSEditLog.JournalAndStream j : fi.getEditLog().getJournals()) {
+    for (JournalAndStream j : fi.getEditLog().getJournals()) {
       if (j.getManager() instanceof FileJournalManager) {
         FileJournalManager fm = (FileJournalManager)j.getManager();
         if (fm.getStorageDirectory().getRoot().equals(path2)



Mime
View raw message