hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1089992 - in /hadoop/hdfs/branches/HDFS-1073: ./ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Thu, 07 Apr 2011 20:30:16 GMT
Author: todd
Date: Thu Apr  7 20:30:15 2011
New Revision: 1089992

URL: http://svn.apache.org/viewvc?rev=1089992&view=rev
Log:
Revert HDFS-1799 from r1089737 - Ivan has split this work into three smaller patches to be
committed instead.

Removed:
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
Modified:
    hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt?rev=1089992&r1=1089991&r2=1089992&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt Thu Apr  7 20:30:15 2011
@@ -12,5 +12,3 @@ HDFS-1729. Add code to detect valid leng
 HDFS-1793. Add code to inspect a storage directory with txid-based filenames
            (todd)
 HDFS-1794. Add code to list which edit logs are available on a remote NN (todd)
-HDFS-1799. Refactor log rolling and filename management into a JournalManager
-           interface. (todd)

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1089992&r1=1089991&r2=1089992&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
Thu Apr  7 20:30:15 2011
@@ -663,11 +663,6 @@ public abstract class Storage extends St
       lock.channel().close();
       lock = null;
     }
-    
-    @Override
-    public String toString() {
-      return "Storage Directory " + this.root;
-    }
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1089992&r1=1089991&r2=1089992&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
Thu Apr  7 20:30:15 2011
@@ -235,7 +235,7 @@ public class BackupNode extends NameNode
     }
     if(namesystem == null || namesystem.dir == null || getFSImage() == null)
       return true;
-    return false; // TODO fsImage.getEditLog().getNumJournals() == 0;
+    return fsImage.getEditLog().getNumEditStreams() == 0;
   }
 
   private NamespaceInfo handshake(Configuration conf) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1089992&r1=1089991&r2=1089992&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Thu Apr  7 20:30:15 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -50,9 +49,6 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 
 /**
@@ -68,17 +64,9 @@ public class FSEditLog implements NNStor
 
   private static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
-  private enum State {
-    UNINITIALIZED,
-    WRITING_EDITS,
-    WRITING_EDITS_NEW,
-    CLOSED;
-  }  
-  private State state = State.UNINITIALIZED;
-
+  private volatile int sizeOutputFlushBuffer = 512*1024;
 
-  private List<JournalManager> journals = Lists.newArrayList();
-  private List<JournalManager> faultyJournals = Lists.newArrayList();;
+  private ArrayList<EditLogOutputStream> editStreams = null;
 
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
@@ -103,7 +91,6 @@ public class FSEditLog implements NNStor
 
   private NNStorage storage;
 
-
   private static class TransactionId {
     public long txid;
 
@@ -127,33 +114,32 @@ public class FSEditLog implements NNStor
     lastPrintTime = now();
   }
   
-  /**
-   * Initialize the list of edit journals
-   */
-  private void initJournals() {
-    assert journals.isEmpty();
-    assert faultyJournals.isEmpty();
-    Preconditions.checkState(state == State.UNINITIALIZED,
-        "Bad state: %s", state);
-    
-    for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
-      journals.add(new FileJournalManager(sd));
-    }
-    
-    if (journals.isEmpty()) {
-      LOG.error("No edits directories configured!");
-    }
-    
-    state = State.CLOSED;
+  private File getEditFile(StorageDirectory sd) {
+    return storage.getEditFile(sd);
+  }
+  
+  private File getEditNewFile(StorageDirectory sd) {
+    return storage.getEditNewFile(sd);
   }
   
   private int getNumEditsDirs() {
    return storage.getNumStorageDirs(NameNodeDirType.EDITS);
   }
 
-  synchronized boolean isOpen() {
-    return state == State.WRITING_EDITS ||
-           state == State.WRITING_EDITS_NEW;
+  synchronized int getNumEditStreams() {
+    return editStreams == null ? 0 : editStreams.size();
+  }
+
+  /**
+   * Return the currently active edit streams.
+   * This should be used only by unit tests.
+   */
+  ArrayList<EditLogOutputStream> getEditStreams() {
+    return editStreams;
+  }
+
+  boolean isOpen() {
+    return getNumEditStreams() > 0;
   }
 
   /**
@@ -163,33 +149,41 @@ public class FSEditLog implements NNStor
    * @throws IOException
    */
   synchronized void open() throws IOException {
-    if (state == State.UNINITIALIZED) {
-      initJournals();
-    }
-    
-    Preconditions.checkState(state == State.CLOSED,
-        "Bad state: %s", state);
-
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
+    if (editStreams == null)
+      editStreams = new ArrayList<EditLogOutputStream>();
     
-    mapJournalsAndReportErrors(new JournalManagerClosure() {
-      @Override
-      public void apply(JournalManager jm) throws IOException {
-        jm.open();
+    ArrayList<StorageDirectory> al = null;
+    for (Iterator<StorageDirectory> it 
+         = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      File eFile = getEditFile(sd);
+      try {
+        addNewEditLogStream(eFile);
+      } catch (IOException e) {
+        LOG.warn("Unable to open edit log file " + eFile);
+        // Remove the directory from list of storage directories
+        if(al == null) al = new ArrayList<StorageDirectory>(1);
+        al.add(sd);
+        
       }
+    }
     
-    }, "Opening logs");
-    
-    state = State.WRITING_EDITS;
+    if(al != null) storage.reportErrorsOnDirectories(al);
   }
   
-  // TODO remove me!
-  @Deprecated
+  
+  synchronized void addNewEditLogStream(File eFile) throws IOException {
+    EditLogOutputStream eStream = new EditLogFileOutputStream(eFile,
+        sizeOutputFlushBuffer);
+    editStreams.add(eStream);
+  }
+
   synchronized void createEditLogFile(File name) throws IOException {
     waitForSyncToFinish();
 
     EditLogOutputStream eStream = new EditLogFileOutputStream(name,
-        1024);
+        sizeOutputFlushBuffer);
     eStream.create();
     eStream.close();
   }
@@ -198,45 +192,122 @@ public class FSEditLog implements NNStor
    * Shutdown the file store.
    */
   synchronized void close() {
-    LOG.debug("Closing EditLog", new Exception());
-    if (state == State.CLOSED) {
-      LOG.warn("Closing log when already closed", new Exception());
-      return;
-    }
-    
     waitForSyncToFinish();
-    if (journals.isEmpty()) {
+    if (editStreams == null || editStreams.isEmpty()) {
       return;
     }
-
     printStatistics(true);
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
-    mapJournalsAndReportErrors(new JournalManagerClosure() {
-      @Override
-      public void apply(JournalManager jm) throws IOException {
-        jm.close();
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    Iterator<EditLogOutputStream> it = getOutputStreamIterator(null);
+    while(it.hasNext()) {
+      EditLogOutputStream eStream = it.next();
+      try {
+        closeStream(eStream);
+      } catch (IOException e) {
+        LOG.warn("FSEditLog:close - failed to close stream " 
+            + eStream.getName());
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
-    }, "closing journal");
+    }
+    disableAndReportErrorOnStreams(errorStreams);
+    editStreams.clear();
+  }
 
-    state = State.CLOSED;
+  /**
+   * Close and remove edit log stream.
+   * @param index of the stream
+   */
+  synchronized private void removeStream(int index) {
+    EditLogOutputStream eStream = editStreams.get(index);
+    try {
+      eStream.close();
+    } catch (Exception e) {}
+    editStreams.remove(index);
+  }
+
+  /**
+   * The specified streams have IO errors. Close and remove them.
+   */
+  synchronized
+  void disableAndReportErrorOnStreams(List<EditLogOutputStream> errorStreams) {
+    if (errorStreams == null || errorStreams.size() == 0) {
+      return;                       // nothing to do
+    }
+    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
+    for (EditLogOutputStream e : errorStreams) {
+      if (e.getType() == JournalType.FILE) {
+        errorDirs.add(getStorageDirectoryForStream(e));
+      } else {
+        disableStream(e);
+      }
+    }
+
+    try {
+      storage.reportErrorsOnDirectories(errorDirs);
+    } catch (IOException ioe) {
+      LOG.error("Problem erroring streams " + ioe);
+    }
+  }
+
+
+  /**
+   * get an editStream corresponding to a sd
+   * @param es - stream to remove
+   * @return the matching stream
+   */
+  StorageDirectory getStorage(EditLogOutputStream es) {
+    String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
+    .getParentFile().getParentFile().getAbsolutePath();
+
+    Iterator<StorageDirectory> it = storage.dirIterator(); 
+    while (it.hasNext()) {
+      StorageDirectory sd = it.next();
+      LOG.info("comparing: " + parentStorageDir + " and " + sd.getRoot().getAbsolutePath());

+      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+        return sd;
+    }
+    return null;
+  }
+  
+  /**
+   * get an editStream corresponding to a sd
+   * @param sd
+   * @return the matching stream
+   */
+  synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
+    for (EditLogOutputStream es : editStreams) {
+      File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
+        .getParentFile().getParentFile();
+      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+        return es;
+    }
+    return null;
+  }
+
+  /**
+   * check if edits.new log exists in the specified stoorage directory
+   */
+  boolean existsNew(StorageDirectory sd) {
+    return getEditNewFile(sd).exists(); 
   }
 
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  void logEdit(final FSEditLogOpCodes opCode, final Writable ... writables) {
-    assert state != State.CLOSED;
-    
+  void logEdit(FSEditLogOpCodes opCode, Writable ... writables) {
     synchronized (this) {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if (journals.isEmpty()) {
+      if(getNumEditStreams() == 0)
         throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      }
-      
+      ArrayList<EditLogOutputStream> errorStreams = null;
+
       // Only start a new transaction for OPs which will be persisted to disk.
       // Obviously this excludes control op codes.
       long start = now();
@@ -244,16 +315,19 @@ public class FSEditLog implements NNStor
         start = beginTransaction();
       }
 
-      mapJournalsAndReportErrors(new JournalManagerClosure() {
-        @Override 
-        public void apply(JournalManager jm) throws IOException {
-          EditLogOutputStream stream = jm.getCurrentStream();
-          if(!stream.isOperationSupported(opCode.getOpCode()))
-            return;
-          stream.write(opCode.getOpCode(), txid, writables);
+      for(EditLogOutputStream eStream : editStreams) {
+        if(!eStream.isOperationSupported(opCode.getOpCode()))
+          continue;
+        try {
+          eStream.write(opCode.getOpCode(), txid, writables);
+        } catch (IOException ie) {
+          LOG.error("logEdit: removing "+ eStream.getName(), ie);
+          if(errorStreams == null)
+            errorStreams = new ArrayList<EditLogOutputStream>(1);
+          errorStreams.add(eStream);
         }
-      }, "logging edit");
-
+      }
+      disableAndReportErrorOnStreams(errorStreams);
       endTransaction(start);
       
       // check if it is time to schedule an automatic sync
@@ -297,8 +371,8 @@ public class FSEditLog implements NNStor
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (JournalManager j : journals) {
-      if (j.getCurrentStream().shouldForceSync()) {
+    for (EditLogOutputStream eStream : editStreams) {
+      if (eStream.shouldForceSync()) {
         return true;
       }
     }
@@ -391,15 +465,12 @@ public class FSEditLog implements NNStor
    * waitForSyncToFinish() before assuming they are running alone.
    */
   public void logSync() {
+    ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
 
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
-    
-    List<JournalManager> stillGoodJournals =
-      Lists.newArrayListWithCapacity(journals.size());
-    List<JournalManager> badJournals = Lists.newArrayList();
-    
+    ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
     boolean sync = false;
     try {
       synchronized (this) {
@@ -430,15 +501,20 @@ public class FSEditLog implements NNStor
         sync = true;
   
         // swap buffers
-        assert !journals.isEmpty() : "no editlog streams";
-        
-        for (JournalManager j : journals) {
+        assert editStreams.size() > 0 : "no editlog streams";
+        for(EditLogOutputStream eStream : editStreams) {
           try {
-            j.getCurrentStream().setReadyToFlush();
-            stillGoodJournals.add(j);
+            eStream.setReadyToFlush();
+            streams.add(eStream);
           } catch (IOException ie) {
             LOG.error("Unable to get ready to flush.", ie);
-            badJournals.add(j);
+            //
+            // remember the streams that encountered an error.
+            //
+            if (errorStreams == null) {
+              errorStreams = new ArrayList<EditLogOutputStream>(1);
+            }
+            errorStreams.add(eStream);
           }
         }
         } finally {
@@ -449,19 +525,22 @@ public class FSEditLog implements NNStor
   
       // do the sync
       long start = now();
-      for (JournalManager j : stillGoodJournals) {
+      for (EditLogOutputStream eStream : streams) {
         try {
-          j.getCurrentStream().flush();
+          eStream.flush();
         } catch (IOException ie) {
           LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
-          badJournals.add(j);
+          if (errorStreams == null) {
+            errorStreams = new ArrayList<EditLogOutputStream>(1);
+          }
+          errorStreams.add(eStream);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnJournals(badJournals);
+      disableAndReportErrorOnStreams(errorStreams);
   
       if (metrics != null) // Metrics non-null only when used inside name node
         metrics.syncs.inc(elapsed);
@@ -485,7 +564,7 @@ public class FSEditLog implements NNStor
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (journals.isEmpty()) {
+    if (editStreams == null || editStreams.size()==0) {
       return;
     }
     lastPrintTime = now;
@@ -497,11 +576,12 @@ public class FSEditLog implements NNStor
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    buf.append(journals.get(0).getCurrentStream().getNumSync());
+    buf.append(editStreams.get(0).getNumSync());
     buf.append(" SyncTimes(ms): ");
 
-    for (JournalManager j : journals) {
-      EditLogOutputStream eStream = j.getCurrentStream();
+    int numEditStreams = editStreams.size();
+    for (int idx = 0; idx < numEditStreams; idx++) {
+      EditLogOutputStream eStream = editStreams.get(idx);
       buf.append(eStream.getTotalSyncTime());
       buf.append(" ");
     }
@@ -709,37 +789,29 @@ public class FSEditLog implements NNStor
    * Return the size of the current EditLog
    */
   synchronized long getEditLogSize() throws IOException {
-    assert getNumEditsDirs() <= journals.size() :
+    assert getNumEditsDirs() <= getNumEditStreams() : 
         "Number of edits directories should not exceed the number of streams.";
     long size = 0;
-        
-    List<JournalManager> badJournals = Lists.newArrayList();
-    
-    for (JournalManager j : journals) {
-      EditLogOutputStream es = j.getCurrentStream();
+    ArrayList<EditLogOutputStream> al = null;
+    for (int idx = 0; idx < getNumEditStreams(); idx++) {
+      EditLogOutputStream es = editStreams.get(idx);
       try {
         long curSize = es.length();
         assert (size == 0 || size == curSize || curSize ==0) :
           "Wrong streams size";
         size = Math.max(size, curSize);
       } catch (IOException e) {
-        LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e);
-        badJournals.add(j);
+        LOG.error("getEditLogSize: editstream.length failed. removing editlog (" +
+            idx + ") " + es.getName());
+        if(al==null) al = new ArrayList<EditLogOutputStream>(1);
+        al.add(es);
       }
     }
-    disableAndReportErrorOnJournals(badJournals);
-    
+    if(al!=null) disableAndReportErrorOnStreams(al);
     return size;
   }
   
   /**
-   * Used only by unit tests.
-   */
-  List<JournalManager> getJournals() {
-    return journals;
-  }
-  
-  /**
    * Return a manifest of what finalized edit logs are available
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
@@ -760,22 +832,29 @@ public class FSEditLog implements NNStor
    *         in the new log
    */
   synchronized void rollEditLog() throws IOException {
-    Preconditions.checkState(state == State.WRITING_EDITS ||
-                             state == State.WRITING_EDITS_NEW,
-                             "Bad state: %s", state);
-    if (state == State.WRITING_EDITS_NEW){
-      LOG.debug("Tried to roll edit logs when already rolled");
+    waitForSyncToFinish();
+    Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
+    if(!it.hasNext()) 
       return;
+    //
+    // If edits.new already exists in some directory, verify it
+    // exists in all directories.
+    //
+    boolean alreadyExists = existsNew(it.next());
+    while(it.hasNext()) {
+      StorageDirectory sd = it.next();
+      if(alreadyExists != existsNew(sd))
+        throw new IOException(getEditNewFile(sd) 
+              + "should " + (alreadyExists ? "" : "not ") + "exist.");
     }
-
-    waitForSyncToFinish();
+    if(alreadyExists)
+      return; // nothing to do, edits.new exists!
 
     // check if any of failed storage is now available and put it back
     storage.attemptRestoreRemovedStorage();
 
     divertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
-    state = State.WRITING_EDITS_NEW;
   }
 
   /**
@@ -785,33 +864,48 @@ public class FSEditLog implements NNStor
    * @param dest new stream path relative to the storage directory root.
    * @throws IOException
    */
-  synchronized void divertFileStreams(final String dest) throws IOException {
-    Preconditions.checkState(state == State.WRITING_EDITS,
-        "Bad state: " + state);
-
+  synchronized void divertFileStreams(String dest) throws IOException {
     waitForSyncToFinish();
 
-    mapJournalsAndReportErrors(new JournalManagerClosure() {
-      
-      @Override
-      public void apply(JournalManager jm) throws IOException {
-        jm.divertFileStreams(dest);
-      }
-    }, "Diverting file streams to " + dest);
+    assert getNumEditStreams() >= getNumEditsDirs() :
+      "Inconsistent number of streams";
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    EditStreamIterator itE = 
+      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
+    Iterator<StorageDirectory> itD = 
+      storage.dirIterator(NameNodeDirType.EDITS);
+    while(itE.hasNext() && itD.hasNext()) {
+      EditLogOutputStream eStream = itE.next();
+      StorageDirectory sd = itD.next();
+      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
+        throw new IOException("Inconsistent order of edit streams: " + eStream);
+      try {
+        // close old stream
+        closeStream(eStream);
+        // create new stream
+        eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
+            sizeOutputFlushBuffer);
+        eStream.create();
+        // replace by the new stream
+        itE.replace(eStream);
+      } catch (IOException e) {
+        LOG.warn("Error in editStream " + eStream.getName(), e);
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
+    }
+    disableAndReportErrorOnStreams(errorStreams);
+  }
 
   /**
    * Removes the old edit log and renames edits.new to edits.
    * Reopens the edits file.
    */
   synchronized void purgeEditLog() throws IOException {
-    Preconditions.checkState(state == State.WRITING_EDITS_NEW,
-        "Bad state: " + state);
-
     waitForSyncToFinish();
     revertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
-    state = State.WRITING_EDITS;
   }
 
 
@@ -836,17 +930,65 @@ public class FSEditLog implements NNStor
    * @param dest new stream path relative to the storage directory root.
    * @throws IOException
    */
-  synchronized void revertFileStreams(final String source) throws IOException {
+  synchronized void revertFileStreams(String source) throws IOException {
     waitForSyncToFinish();
 
-    mapJournalsAndReportErrors(new JournalManagerClosure() {
-
-      @Override
-      public void apply(JournalManager jm) throws IOException {
-        jm.revertFileStreams(source);
+    assert getNumEditStreams() >= getNumEditsDirs() :
+      "Inconsistent number of streams";
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    EditStreamIterator itE = 
+      (EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
+    Iterator<StorageDirectory> itD = 
+      storage.dirIterator(NameNodeDirType.EDITS);
+    while(itE.hasNext() && itD.hasNext()) {
+      EditLogOutputStream eStream = itE.next();
+      StorageDirectory sd = itD.next();
+      if(!eStream.getName().startsWith(sd.getRoot().getPath()))
+        throw new IOException("Inconsistent order of edit streams: " + eStream +
+                              " does not start with " + sd.getRoot().getPath());
+      try {
+        // close old stream
+        closeStream(eStream);
+        // rename edits.new to edits
+        File editFile = getEditFile(sd);
+        File prevEditFile = new File(sd.getRoot(), source);
+        if(prevEditFile.exists()) {
+          if(!prevEditFile.renameTo(editFile)) {
+            //
+            // renameTo() fails on Windows if the destination
+            // file exists.
+            //
+            if(!editFile.delete() || !prevEditFile.renameTo(editFile)) {
+              throw new IOException("Rename failed for " + sd.getRoot());
+            }
+          }
+        }
+        // open new stream
+        eStream = new EditLogFileOutputStream(editFile, sizeOutputFlushBuffer);
+        // replace by the new stream
+        itE.replace(eStream);
+      } catch (IOException e) {
+        LOG.warn("Error in editStream " + eStream.getName(), e);
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
       }
-      
-    }, "Reverting file streams to " + source);
+    }
+    disableAndReportErrorOnStreams(errorStreams);
+  }
+
+  /**
+   * Return the name of the edit file
+   */
+  synchronized File getFsEditName() {
+    StorageDirectory sd = null;   
+    for (Iterator<StorageDirectory> it = 
+      storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      sd = it.next();   
+      if(sd.getRoot().canRead())
+        return getEditFile(sd);
+    }
+    return null;
   }
 
   /**
@@ -855,7 +997,7 @@ public class FSEditLog implements NNStor
   synchronized long getFsEditTime() {
     Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.EDITS);
     if(it.hasNext())
-      return NNStorage.getEditFile(it.next()).lastModified();
+      return getEditFile(it.next()).lastModified();
     return 0;
   }
 
@@ -870,9 +1012,7 @@ public class FSEditLog implements NNStor
 
   // sets the initial capacity of the flush buffer.
   public void setBufferCapacity(int size) {
-    for (JournalManager jm : journals) {
-      jm.setBufferCapacity(size);
-    }
+    sizeOutputFlushBuffer = size;
   }
 
 
@@ -894,7 +1034,6 @@ public class FSEditLog implements NNStor
   synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
                       NamenodeRegistration nnReg) // active name-node
   throws IOException {
-    /*
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
     if(editStreams == null)
@@ -911,29 +1050,99 @@ public class FSEditLog implements NNStor
       editStreams.add(boStream);
     }
     logEdit(OP_JSPOOL_START, (Writable[])null);
-    TODO: backupnode is disabled
-    */
   }
 
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  synchronized void logEdit(final int length, final byte[] data) {
-    if (journals.isEmpty())
+  synchronized void logEdit(int length, byte[] data) {
+    if(getNumEditStreams() == 0)
       throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
+    ArrayList<EditLogOutputStream> errorStreams = null;
     long start = beginTransaction();
-    
-    mapJournalsAndReportErrors(new JournalManagerClosure() {
-      @Override
-      public void apply(JournalManager jm) throws IOException {
-        jm.getCurrentStream().write(data, 0, length);        
-      }      
-    }, "Logging edit");
-
+    for(EditLogOutputStream eStream : editStreams) {
+      try {
+        eStream.write(data, 0, length);
+      } catch (IOException ie) {
+        LOG.warn("Error in editStream " + eStream.getName(), ie);
+        if(errorStreams == null)
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        errorStreams.add(eStream);
+      }
+    }
+    disableAndReportErrorOnStreams(errorStreams);
     endTransaction(start);
   }
 
+  /**
+   * Iterates output streams based of the same type.
+   * Type null will iterate over all streams.
+   */
+  private class EditStreamIterator implements Iterator<EditLogOutputStream> {
+    JournalType type;
+    int prevIndex; // for remove()
+    int nextIndex; // for next()
+
+    EditStreamIterator(JournalType streamType) {
+      this.type = streamType;
+      this.nextIndex = 0;
+      this.prevIndex = 0;
+    }
+
+    public boolean hasNext() {
+      synchronized(FSEditLog.this) {
+        if(editStreams == null || 
+           editStreams.isEmpty() || nextIndex >= editStreams.size())
+          return false;
+        while(nextIndex < editStreams.size()
+              && !editStreams.get(nextIndex).getType().isOfType(type))
+          nextIndex++;
+        return nextIndex < editStreams.size();
+      }
+    }
+
+    public EditLogOutputStream next() {
+      EditLogOutputStream stream = null;
+      synchronized(FSEditLog.this) {
+        stream = editStreams.get(nextIndex);
+        prevIndex = nextIndex;
+        nextIndex++;
+        while(nextIndex < editStreams.size()
+            && !editStreams.get(nextIndex).getType().isOfType(type))
+        nextIndex++;
+      }
+      return stream;
+    }
+
+    public void remove() {
+      nextIndex = prevIndex; // restore previous state
+      removeStream(prevIndex); // remove last returned element
+      hasNext(); // reset nextIndex to correct place
+    }
+
+    void replace(EditLogOutputStream newStream) {
+      synchronized (FSEditLog.this) {
+        assert 0 <= prevIndex && prevIndex < editStreams.size() :
+                                                          "Index out of bound.";
+        editStreams.set(prevIndex, newStream);
+      }
+    }
+  }
+
+  /**
+   * Get stream iterator for the specified type.
+   */
+  public Iterator<EditLogOutputStream>
+  getOutputStreamIterator(JournalType streamType) {
+    return new EditStreamIterator(streamType);
+  }
+
+  private void closeStream(EditLogOutputStream eStream) throws IOException {
+    eStream.setReadyToFlush();
+    eStream.flush();
+    eStream.close();
+  }
 
   void incrementCheckpointTime() {
     storage.incrementCheckpointTime();
@@ -942,7 +1151,6 @@ public class FSEditLog implements NNStor
   }
 
   synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    /*
     Iterator<EditLogOutputStream> it =
                                   getOutputStreamIterator(JournalType.BACKUP);
     ArrayList<EditLogOutputStream> errorStreams = null;
@@ -959,14 +1167,11 @@ public class FSEditLog implements NNStor
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnJournals(errorStreams);
-    TODO BN currently disabled
-    */
+    disableAndReportErrorOnStreams(errorStreams);
   }
 
   synchronized boolean checkBackupRegistration(
       NamenodeRegistration registration) {
-    /*
     Iterator<EditLogOutputStream> it =
                                   getOutputStreamIterator(JournalType.BACKUP);
     boolean regAllowed = !it.hasNext();
@@ -989,12 +1194,8 @@ public class FSEditLog implements NNStor
     }
     assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
       "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnJournals(errorStreams);
+    disableAndReportErrorOnStreams(errorStreams);
     return regAllowed;
-    
-    TODO BN currently disabled
-    */
-    return false;
   }
   
   static BytesWritable toBytesWritable(Options.Rename... options) {
@@ -1004,81 +1205,40 @@ public class FSEditLog implements NNStor
     }
     return new BytesWritable(bytes);
   }
-  
-  //// Iteration across journals
-  private interface JournalManagerClosure {
-    public void apply(JournalManager jm) throws IOException;
-  }
 
   /**
-   * Apply the given function across all of the journal managers, disabling
-   * any for which the closure throws an IOException.
-   * @param status message used for logging errors (e.g. "opening journal")
+   * Get the StorageDirectory for a stream
+   * @param es Stream whose StorageDirectory we wish to know
+   * @return the matching StorageDirectory
    */
-  private void mapJournalsAndReportErrors(
-      JournalManagerClosure closure, String status) {
-    ArrayList<JournalManager> badJournals = Lists.newArrayList();
-    for (JournalManager j : journals) {
-      try {
-        closure.apply(j);
-      } catch (IOException ioe) {
-        LOG.error("Error " + status + " (journal " + j + ")", ioe);
-        badJournals.add(j);
-      }
-    }
-    disableAndReportErrorOnJournals(badJournals);
-  }
-  
-  /**
-   * Called when some journals experience an error in some operation.
-   * This propagates errors to the storage level.
-   */
-  void disableAndReportErrorOnJournals(List<JournalManager> badJournals) {
-    if (badJournals == null || badJournals.isEmpty()) {
-      return; // nothing to do
-    }
+  StorageDirectory getStorageDirectoryForStream(EditLogOutputStream es) {
+    String parentStorageDir = ((EditLogFileOutputStream)es).getFile().getParentFile().getParentFile().getAbsolutePath();
 
-    ArrayList<StorageDirectory> errorDirs = new ArrayList<StorageDirectory>();
-    for (JournalManager j : badJournals) {
-      LOG.error("Disabling journal " + j);
-      StorageDirectory sd = j.getStorageDirectory();
-      if (sd != null) {
-        errorDirs.add(sd);
-        // We will report this error to storage, which will propagate back
-        // to our listener interface, at which point we'll mark it faulty.
-      } else {
-        // Just mark it faulty ourselves, since it's not associated with a
-        // storage directory.
-        markJournalFaulty(j);
-      }
-    }
-    
-    try {
-      storage.reportErrorsOnDirectories(errorDirs);
-    } catch (IOException ioe) {
-      LOG.error("Problem reporting error on directories ", ioe);
+    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      StorageDirectory sd = it.next();
+      FSNamesystem.LOG.info("comparing: " + parentStorageDir 
+                            + " and " + sd.getRoot().getAbsolutePath()); 
+      if (parentStorageDir.equals(sd.getRoot().getAbsolutePath()))
+        return sd;
     }
+    return null;
   }
- 
-  private synchronized void markJournalFaulty(JournalManager journal) {
-    try {
-      journal.abort();
-    } catch (IOException e) {
-      LOG.warn("Failed to abort faulty journal " + journal
-          + " before removing it (might be OK)", e);
+
+  private synchronized void disableStream(EditLogOutputStream stream) {
+    try { stream.close(); } catch (IOException e) {
+      // nothing to do.
+      LOG.warn("Failed to close eStream " + stream.getName()
+               + " before removing it (might be ok)");
     }
-    journals.remove(journal);
-    faultyJournals.add(journal);
+    editStreams.remove(stream);
 
-    if (journals.isEmpty()) {
-      String msg = "Fatal Error: All journals are inaccessible.";
+    if (editStreams.size() <= 0) {
+      String msg = "Fatal Error: All storage directories are inaccessible.";
       LOG.fatal(msg, new IOException(msg));
       Runtime.getRuntime().exit(-1);
     }
   }
 
-
-
   /**
    * Error Handling on a storageDirectory
    *
@@ -1087,18 +1247,22 @@ public class FSEditLog implements NNStor
   @Override // NNStorageListener
   public synchronized void errorOccurred(StorageDirectory sd)
       throws IOException {
-    
-    LOG.debug("Error occurred on " + sd);
-    
-    for (JournalManager jm : journals) {
-      if (jm.getStorageDirectory() == sd) {
-        LOG.warn("Marking corresponding journal " + jm + " faulty");
-        markJournalFaulty(jm);
-        return;
+    ArrayList<EditLogOutputStream> errorStreams
+      = new ArrayList<EditLogOutputStream>();
+
+    for (EditLogOutputStream eStream : editStreams) {
+      LOG.error("Unable to log edits to " + eStream.getName()
+                + "; removing it");
+
+      StorageDirectory streamStorageDir = getStorageDirectoryForStream(eStream);
+      if (sd == streamStorageDir) {
+        errorStreams.add(eStream);
       }
     }
-    
-    LOG.debug("Faulty " + sd + " did not correspond to any live journal manager.");
+
+    for (EditLogOutputStream eStream : errorStreams) {
+      disableStream(eStream);
+    }
   }
 
   @Override // NNStorageListener
@@ -1112,26 +1276,9 @@ public class FSEditLog implements NNStor
   @Override // NNStorageListener
   public synchronized void directoryAvailable(StorageDirectory sd)
       throws IOException {
-    for (JournalManager jm : journals) {
-      assert (jm.getStorageDirectory() != sd) :
-        "Storage directory " + sd + " being restored but wasn't marked faulty";
-    }
-
-    for (Iterator<JournalManager> iter = faultyJournals.iterator();
-         iter.hasNext();) {
-      JournalManager jm = iter.next();
-      if (jm.getStorageDirectory() == sd) {
-        try {
-          jm.restore();
-          iter.remove();
-          journals.add(jm);
-        } catch (IOException ioe) {
-          // TODO make sure this code is covered by unit tests!
-          LOG.error("Unable to restore storage directory " + sd, ioe);
-          storage.reportErrorsOnDirectory(sd);
-        }
-        break;
-      }
+    if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+      File eFile = getEditFile(sd);
+      addNewEditLogStream(eFile);
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1089992&r1=1089991&r2=1089992&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
Thu Apr  7 20:30:15 2011
@@ -754,14 +754,14 @@ public class NNStorage extends Storage i
   /**
    * @return A editlog File in storage directory 'sd'.
    */
-  static File getEditFile(StorageDirectory sd) {
+  File getEditFile(StorageDirectory sd) {
     return getStorageFile(sd, NameNodeFile.EDITS);
   }
 
   /**
    * @return A temporary editlog File in storage directory 'sd'.
    */
-  static File getEditNewFile(StorageDirectory sd) {
+  File getEditNewFile(StorageDirectory sd) {
     return getStorageFile(sd, NameNodeFile.EDITS_NEW);
   }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1089992&r1=1089991&r2=1089992&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
Thu Apr  7 20:30:15 2011
@@ -341,11 +341,9 @@ public class TestEditLogRace {
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      FileJournalManager journal = (FileJournalManager)
-          editLog.getJournals().get(0);
-      EditLogFileOutputStream spyElos =
-          spy((EditLogFileOutputStream)journal.getCurrentStream());
-      journal.setCurrentStreamForTests(spyElos);
+      ArrayList<EditLogOutputStream> streams = editLog.getEditStreams();
+      EditLogOutputStream spyElos = spy(streams.get(0));
+      streams.set(0, spyElos);
 
       final AtomicReference<Throwable> deferredException =
           new AtomicReference<Throwable>();



Mime
View raw message