hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1095868 - in /hadoop/hdfs/branches/HDFS-1073: CHANGES.HDFS-1073.txt src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Date Fri, 22 Apr 2011 05:39:05 GMT
Author: todd
Date: Fri Apr 22 05:39:04 2011
New Revision: 1095868

URL: http://svn.apache.org/viewvc?rev=1095868&view=rev
Log:
HDFS-1859. Add some convenience functions to iterate over edit log streams. Contributed by
Ivan Kelly and Todd Lipcon.

Modified:
    hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt?rev=1095868&r1=1095867&r2=1095868&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.HDFS-1073.txt Fri Apr 22 05:39:04 2011
@@ -14,3 +14,5 @@ HDFS-1793. Add code to inspect a storage
 HDFS-1794. Add code to list which edit logs are available on a remote NN (todd)
 HDFS-1858. Add state management variables to FSEditLog (Ivan Kelly and Todd
            Lipcon via todd)
+HDFS-1859. Add some convenience functions to iterate over edit log streams
+           (Ivan Kelly and Todd Lipcon via todd)

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1095868&r1=1095867&r2=1095868&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Fri Apr 22 05:39:04 2011
@@ -252,21 +252,13 @@ public class FSEditLog implements NNStor
     printStatistics(true);
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    Iterator<EditLogOutputStream> it = getOutputStreamIterator(null);
-    while(it.hasNext()) {
-      EditLogOutputStream eStream = it.next();
-      try {
-        closeStream(eStream);
-      } catch (IOException e) {
-        LOG.warn("FSEditLog:close - failed to close stream " 
-            + eStream.getName());
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
+    mapStreamsAndReportErrors(new StreamClosure() {
+        @Override
+        public void apply(EditLogOutputStream stream) throws IOException {
+          closeStream(stream);
+        }      
+      }, "Closing stream");
+   
     editStreams.clear();
 
     state = State.CLOSED;
@@ -354,7 +346,8 @@ public class FSEditLog implements NNStor
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  void logEdit(FSEditLogOpCodes opCode, Writable ... writables) {
+
+  void logEdit(final FSEditLogOpCodes opCode, final Writable ... writables) {
     assert state != State.UNINITIALIZED && state != State.CLOSED;
 
     synchronized (this) {
@@ -363,7 +356,6 @@ public class FSEditLog implements NNStor
       
       if(getNumEditStreams() == 0)
         throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      ArrayList<EditLogOutputStream> errorStreams = null;
 
       // Only start a new transaction for OPs which will be persisted to disk.
       // Obviously this excludes control op codes.
@@ -372,19 +364,15 @@ public class FSEditLog implements NNStor
         start = beginTransaction();
       }
 
-      for(EditLogOutputStream eStream : editStreams) {
-        if(!eStream.isOperationSupported(opCode.getOpCode()))
-          continue;
-        try {
-          eStream.write(opCode.getOpCode(), txid, writables);
-        } catch (IOException ie) {
-          LOG.error("logEdit: removing "+ eStream.getName(), ie);
-          if(errorStreams == null)
-            errorStreams = new ArrayList<EditLogOutputStream>(1);
-          errorStreams.add(eStream);
-        }
-      }
-      disableAndReportErrorOnStreams(errorStreams);
+      mapStreamsAndReportErrors(new StreamClosure() {
+          @Override
+          public void apply(EditLogOutputStream stream) throws IOException {
+            if(!stream.isOperationSupported(opCode.getOpCode()))
+              return;
+            stream.write(opCode.getOpCode(), txid, writables);
+          }      
+        }, "Writing op to stream");
+
       endTransaction(start);
       
       // check if it is time to schedule an automatic sync
@@ -1129,22 +1117,18 @@ public class FSEditLog implements NNStor
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.
    */
-  synchronized void logEdit(int length, byte[] data) {
+  synchronized void logEdit(final int length, final byte[] data) {
     if(getNumEditStreams() == 0)
       throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-    ArrayList<EditLogOutputStream> errorStreams = null;
     long start = beginTransaction();
-    for(EditLogOutputStream eStream : editStreams) {
-      try {
-        eStream.write(data, 0, length);
-      } catch (IOException ie) {
-        LOG.warn("Error in editStream " + eStream.getName(), ie);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-      }
-    }
-    disableAndReportErrorOnStreams(errorStreams);
+
+    mapStreamsAndReportErrors(new StreamClosure() {
+        @Override
+        public void apply(EditLogOutputStream stream) throws IOException {
+          stream.write(data, 0, length);
+        }
+      }, "Writing op to stream");
+
     endTransaction(start);
   }
 
@@ -1357,4 +1341,27 @@ public class FSEditLog implements NNStor
       addNewEditLogStream(eFile);
     }
   }
+  
+  //// Iteration across streams
+  private interface StreamClosure {
+    public void apply(EditLogOutputStream jm) throws IOException;
+  }
+ 
+  /**
+   * Apply the given function across all of the edit streams, disabling
+   * any for which the closure throws an IOException.
+   * @param status message used for logging errors (e.g. "opening journal")
+   */
+  private void mapStreamsAndReportErrors(StreamClosure closure, String status) {
+    ArrayList<EditLogOutputStream> badStreams = new ArrayList<EditLogOutputStream>();
+    for (EditLogOutputStream stream : editStreams) {
+      try {
+        closure.apply(stream);
+      } catch (IOException ioe) {
+        LOG.error("Error " + status + " (stream " + stream + ")", ioe);
+        badStreams.add(stream);
+      }
+    }
+    disableAndReportErrorOnStreams(badStreams);
+  }
 }



Mime
View raw message