chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r817532 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Date Tue, 22 Sep 2009 07:58:03 GMT
Author: asrabkin
Date: Tue Sep 22 07:58:03 2009
New Revision: 817532

URL: http://svn.apache.org/viewvc?rev=817532&view=rev
Log:
CHUKWA-392.  FIFO queueing of threads in collector.

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=817532&r1=817531&r2=817532&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Tue Sep 22 07:58:03 2009
@@ -58,6 +58,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-392.  FIFO queueing of threads in collector. (asrabkin)
+
     CHUKWA-388.  Clean up user interface color.  (Eric Yang)
 
     CHUKWA-387.  Summarize mode for dumpChunks should count bytes. (asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=817532&r1=817531&r2=817532&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Tue Sep 22 07:58:03 2009
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Semaphore;
 
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
@@ -54,7 +55,7 @@
   public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
   static String localHostAddr = null;
   
-  final Object lock = new Object();
+  final Semaphore lock = new Semaphore(1, true);
 
   
   private FileSystem fs = null;
@@ -183,38 +184,39 @@
 
     boolean bailOut = false;
 
-    synchronized (lock) {
-      try {
+     try {
+      lock.acquire();
 
-        FSDataOutputStream previousOutputStr = currentOutputStr;
-        Path previousPath = currentPath;
-        String previousFileName = currentFileName;
-
-        if (previousOutputStr != null) {
-          previousOutputStr.close();
-          if (bytesThisRotate > 0) {
-            log.info("rotating sink file " + previousPath);
-            fs.rename(previousPath, new Path(previousFileName + ".done"));
-          } else {
-            log.info("no chunks written to " + previousPath + ", deleting");
-            fs.delete(previousPath, false);
-          }
+      FSDataOutputStream previousOutputStr = currentOutputStr;
+      Path previousPath = currentPath;
+      String previousFileName = currentFileName;
+
+      if (previousOutputStr != null) {
+        previousOutputStr.close();
+        if (bytesThisRotate > 0) {
+          log.info("rotating sink file " + previousPath);
+          fs.rename(previousPath, new Path(previousFileName + ".done"));
+        } else {
+          log.info("no chunks written to " + previousPath + ", deleting");
+          fs.delete(previousPath, false);
         }
-        Path newOutputPath = new Path(newName + ".chukwa");
-        FSDataOutputStream newOutputStr = fs.create(newOutputPath);
-        currentOutputStr = newOutputStr;
-        currentPath = newOutputPath;
-        currentFileName = newName;
-        bytesThisRotate = 0;
-        // Uncompressed for now
-        seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
-            ChukwaArchiveKey.class, ChunkImpl.class,
-            SequenceFile.CompressionType.NONE, null);
-      } catch (Throwable e) {
-        log.warn("Got an exception in rotate",e);
-        bailOut = true;
-        isRunning = false;
       }
+      Path newOutputPath = new Path(newName + ".chukwa");
+      FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+      currentOutputStr = newOutputStr;
+      currentPath = newOutputPath;
+      currentFileName = newName;
+      bytesThisRotate = 0;
+      // Uncompressed for now
+      seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+          ChukwaArchiveKey.class, ChunkImpl.class,
+          SequenceFile.CompressionType.NONE, null);
+    } catch (Throwable e) {
+      log.warn("Got an exception in rotate",e);
+      bailOut = true;
+      isRunning = false;
+    } finally {
+      lock.release();
     }
     
     if (bailOut) {
@@ -259,37 +261,38 @@
     }
 
     if (chunks != null) {
+      ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+      
+      if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+        computeTimePeriod();
+      }
       try {
-        ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-        
-        if (System.currentTimeMillis() >= nextTimePeriodComputation) {
-          computeTimePeriod();
-        }
-        synchronized (lock) {
-          for (Chunk chunk : chunks) {
-            archiveKey.setTimePartition(timePeriod);
-            archiveKey.setDataType(chunk.getDataType());
-            archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
-                + "/" + chunk.getStreamName());
-            archiveKey.setSeqId(chunk.getSeqID());
-
-            if (chunk != null) {
-              // compute size for stats
-              dataSize += chunk.getData().length;
-              bytesThisRotate += chunk.getData().length;
-              seqFileWriter.append(archiveKey, chunk);
-
-              String futureName = currentPath.getName().replace(".chukwa", ".done");
-              result.addPend(futureName, currentOutputStr.getPos());
-            }
+        lock.acquire();
+        for (Chunk chunk : chunks) {
+          archiveKey.setTimePartition(timePeriod);
+          archiveKey.setDataType(chunk.getDataType());
+          archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+              + "/" + chunk.getStreamName());
+          archiveKey.setSeqId(chunk.getSeqID());
+
+          if (chunk != null) {
+            // compute size for stats
+            dataSize += chunk.getData().length;
+            bytesThisRotate += chunk.getData().length;
+            seqFileWriter.append(archiveKey, chunk);
 
+            String futureName = currentPath.getName().replace(".chukwa", ".done");
+            result.addPend(futureName, currentOutputStr.getPos());
           }
-        }// End synchro
+
+        }
       } catch (Throwable e) {
         // We don't want to loose anything
         log.fatal("IOException when trying to write a chunk, Collector is going to exit!",
e);
         DaemonWatcher.bailout(-1);
         isRunning = false;
+      } finally {
+        lock.release();
       }
     }
     return result;
@@ -311,19 +314,20 @@
     // or Collector has received a kill -TERM
   
     try {
-      synchronized(lock) {
-        if (this.currentOutputStr != null) {
-          this.currentOutputStr.close();
-        }
-        if(ENABLE_ROTATION_ON_CLOSE)
-          if(bytesThisRotate > 0)
-            fs.rename(currentPath, new Path(currentFileName + ".done"));
-          else
-            fs.delete(currentPath, false);
-
+      lock.acquire();
+      if (this.currentOutputStr != null) {
+        this.currentOutputStr.close();
       }
+      if(ENABLE_ROTATION_ON_CLOSE)
+        if(bytesThisRotate > 0)
+          fs.rename(currentPath, new Path(currentFileName + ".done"));
+        else
+          fs.delete(currentPath, false);
+
     } catch (Throwable e) {
      log.warn("cannot rename dataSink file:" + currentPath,e);
+    } finally {
+      lock.release();
     }
   }
 



Mime
View raw message