chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From billgra...@apache.org
Subject svn commit: r1038719 - in /incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection: collector/servlet/ServletCollector.java writer/SeqFileWriter.java
Date Wed, 24 Nov 2010 17:47:47 GMT
Author: billgraham
Date: Wed Nov 24 17:47:47 2010
New Revision: 1038719

URL: http://svn.apache.org/viewvc?rev=1038719&view=rev
Log:
CHUKWA-533. Improve fault-tolerance of collector

Modified:
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=1038719&r1=1038718&r2=1038719&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
(original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
Wed Nov 24 17:47:47 2010
@@ -147,14 +147,19 @@ public class ServletCollector extends Ht
         }
       }
 
+      int responseStatus = HttpServletResponse.SC_OK;
+
       // write new data to data sync file
       if (writer != null) {
         ChukwaWriter.CommitStatus result = writer.add(events);
-        numberchunks += events.size();
-        lifetimechunks += events.size();
+
         // this is where we ACK this connection
 
         if(result == ChukwaWriter.COMMIT_OK) {
+          // only count the chunks if result is commit or commit pending
+          numberchunks += events.size();
+          lifetimechunks += events.size();
+
           for(Chunk receivedChunk: events) {
             sb.append(ACK_PREFIX);
             sb.append(receivedChunk.getData().length);
@@ -162,10 +167,18 @@ public class ServletCollector extends Ht
             sb.append(receivedChunk.getSeqID() - 1).append("\n");
           }
         } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
+
+          // only count the chunks if result is commit or commit pending
+          numberchunks += events.size();
+          lifetimechunks += events.size();
+
           for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
             sb.append(s);
+        } else if(result == ChukwaWriter.COMMIT_FAIL) {
+          sb.append("Commit failed");
+          responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
         }
-        
+
         l_out.print(sb.toString());
       } else {
         l_out.println("can't write: no writer");
@@ -175,7 +188,7 @@ public class ServletCollector extends Ht
         diagnosticPage.doneWithPost();
       }
 
-      resp.setStatus(200);
+      resp.setStatus(responseStatus);
 
     } catch (Throwable e) {
       log.warn("Exception talking to " + req.getRemoteHost() + " at t="

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=1038719&r1=1038718&r2=1038719&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Wed Nov 24 17:47:47 2010
@@ -28,6 +28,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.io.IOException;
 
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
@@ -80,7 +81,7 @@ public class SeqFileWriter extends Pipel
   protected volatile long dataSize = 0;
   protected volatile long bytesThisRotate = 0;
   protected volatile boolean isRunning = false;
-  
+
   static {
     try {
       localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
@@ -126,7 +127,7 @@ public class SeqFileWriter extends Pipel
       if (fs == null) {
         log.error("can't connect to HDFS at " + fs.getUri() + " bail out!");
         DaemonWatcher.bailout(-1);
-      } 
+      }
     } catch (Throwable e) {
       log.error(
           "can't connect to HDFS, trying default file system instead (likely to be local)",
@@ -183,9 +184,7 @@ public class SeqFileWriter extends Pipel
     newName = newName.replace(".", "");
     newName = outputDir + "/" + newName.trim();
 
-    boolean bailOut = false;
-
-     try {
+    try {
       lock.acquire();
 
       FSDataOutputStream previousOutputStr = currentOutputStr;
@@ -193,43 +192,52 @@ public class SeqFileWriter extends Pipel
       String previousFileName = currentFileName;
 
       if (previousOutputStr != null) {
-        previousOutputStr.close();
+        boolean closed = false;
+        try {
+          log.info("closing sink file" + previousFileName);
+          previousOutputStr.close();
+          closed = true;
+        }catch (Throwable e) {
+          log.error("couldn't close file" + previousFileName, e);
+          //we probably have an orphaned 0 byte file at this point due to an
+          //intermitant HDFS outage. Once HDFS comes up again we'll be able to
+          //close it, although it will be empty.
+        }
+
         if (bytesThisRotate > 0) {
-          log.info("rotating sink file " + previousPath);
-          fs.rename(previousPath, new Path(previousFileName + ".done"));
+          if (closed) {
+            log.info("rotating sink file " + previousPath);
+            fs.rename(previousPath, new Path(previousFileName + ".done"));
+          }
+          else {
+            log.warn(bytesThisRotate + " bytes potentially lost, since " +
+                    previousPath + " could not be closed.");
+          }
         } else {
           log.info("no chunks written to " + previousPath + ", deleting");
           fs.delete(previousPath, false);
         }
       }
+
       Path newOutputPath = new Path(newName + ".chukwa");
       FSDataOutputStream newOutputStr = fs.create(newOutputPath);
-      currentOutputStr = newOutputStr;
-      currentPath = newOutputPath;
-      currentFileName = newName;
-      bytesThisRotate = 0;
       // Uncompressed for now
       seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
           ChukwaArchiveKey.class, ChunkImpl.class,
           SequenceFile.CompressionType.NONE, null);
+
+      // reset these once we know that seqFileWriter was created
+      currentOutputStr = newOutputStr;
+      currentPath = newOutputPath;
+      currentFileName = newName;
+      bytesThisRotate = 0;
     } catch (Throwable e) {
-      log.warn("Got an exception in rotate",e);
-      bailOut = true;
-      isRunning = false;
+      log.warn("Got an exception trying to rotate. Will try again in " +
+              rotateInterval/1000 + " seconds." ,e);
     } finally {
       lock.release();
     }
     
-    if (bailOut) {
-      log.fatal("IO Exception in rotate. Exiting!");
-      // As discussed for now:
-      // Everytime this happen in the past it was because HDFS was down,
-      // so there's nothing we can do
-      // Shutting down the collector for now
-      // Watchdog will re-start it automatically
-      DaemonWatcher.bailout(-1);
-    }   
-
     // Schedule the next timer
     rotateTimer = new Timer();
     rotateTimer.schedule(new TimerTask() {
@@ -237,7 +245,7 @@ public class SeqFileWriter extends Pipel
         rotate();
       }
     }, rotateInterval);
-    
+
   }
 
   
@@ -277,17 +285,26 @@ public class SeqFileWriter extends Pipel
           archiveKey.setSeqId(chunk.getSeqID());
 
           if (chunk != null) {
-            // compute size for stats
+            seqFileWriter.append(archiveKey, chunk);
+
+            // compute size for stats only if append succeeded. Note though that
+            // seqFileWriter.append can continue taking data for quite some time
+            // after HDFS goes down while the client is trying to reconnect. Hence
+            // these stats might not reflect reality during an HDFS outage.
             dataSize += chunk.getData().length;
             bytesThisRotate += chunk.getData().length;
-            seqFileWriter.append(archiveKey, chunk);
 
             String futureName = currentPath.getName().replace(".chukwa", ".done");
             result.addPend(futureName, currentOutputStr.getPos());
           }
 
         }
-      } catch (Throwable e) {
+      }
+      catch (IOException e) {
+        log.error("IOException when trying to write a chunk, Collector will return error
and keep running.", e);
+        return COMMIT_FAIL;
+      }
+      catch (Throwable e) {
         // We don't want to loose anything
         log.fatal("IOException when trying to write a chunk, Collector is going to exit!",
e);
         DaemonWatcher.bailout(-1);



Mime
View raw message