Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 62166 invoked from network); 22 Sep 2009 07:58:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Sep 2009 07:58:37 -0000 Received: (qmail 62058 invoked by uid 500); 22 Sep 2009 07:58:37 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 62046 invoked by uid 500); 22 Sep 2009 07:58:37 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 62031 invoked by uid 99); 22 Sep 2009 07:58:36 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2009 07:58:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2009 07:58:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DDEA523888E9; Tue, 22 Sep 2009 07:58:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r817532 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Date: Tue, 22 Sep 2009 07:58:03 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090922075803.DDEA523888E9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Tue Sep 22 07:58:03 2009 New Revision: 817532 URL: http://svn.apache.org/viewvc?rev=817532&view=rev Log: CHUKWA-392. FIFO queueing of threads in collector. Modified: hadoop/chukwa/trunk/CHANGES.txt hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=817532&r1=817531&r2=817532&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Tue Sep 22 07:58:03 2009 @@ -58,6 +58,8 @@ IMPROVEMENTS + CHUKWA-392. FIFO queueing of threads in collector. (asrabkin) + CHUKWA-388. Clean up user interface color. (Eric Yang) CHUKWA-387. Summarize mode for dumpChunks should count bytes. (asrabkin) Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=817532&r1=817531&r2=817532&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Tue Sep 22 07:58:03 2009 @@ -26,6 +26,7 @@ import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Semaphore; import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; @@ -54,7 +55,7 @@ public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval"; static String localHostAddr = null; - final Object lock = new Object(); + final Semaphore lock = new Semaphore(1, true); private FileSystem fs = null; @@ -183,38 +184,39 @@ boolean bailOut = false; - synchronized (lock) { - try { + try { + lock.acquire(); - FSDataOutputStream previousOutputStr = currentOutputStr; - Path previousPath = currentPath; - String previousFileName = currentFileName; - - if (previousOutputStr != null) { - previousOutputStr.close(); - if (bytesThisRotate > 0) { - log.info("rotating sink file " + previousPath); - fs.rename(previousPath, new Path(previousFileName + ".done")); - } else { - log.info("no chunks written to " + previousPath + ", deleting"); - fs.delete(previousPath, false); - } + FSDataOutputStream previousOutputStr = currentOutputStr; + Path previousPath = currentPath; + String previousFileName = currentFileName; + + if (previousOutputStr != null) { + previousOutputStr.close(); + if (bytesThisRotate > 0) { + log.info("rotating sink file " + previousPath); + fs.rename(previousPath, new Path(previousFileName + ".done")); + } else { + log.info("no chunks written to " + previousPath + ", deleting"); + fs.delete(previousPath, false); } - Path newOutputPath = new Path(newName + ".chukwa"); - FSDataOutputStream newOutputStr = fs.create(newOutputPath); - currentOutputStr = newOutputStr; - currentPath = newOutputPath; - currentFileName = newName; - bytesThisRotate = 0; - // Uncompressed for now - seqFileWriter = SequenceFile.createWriter(conf, newOutputStr, - ChukwaArchiveKey.class, ChunkImpl.class, - SequenceFile.CompressionType.NONE, null); - } catch (Throwable e) { - log.warn("Got an exception in rotate",e); - bailOut = true; - isRunning = false; } + Path newOutputPath = new Path(newName + ".chukwa"); + FSDataOutputStream newOutputStr = fs.create(newOutputPath); + currentOutputStr = newOutputStr; + currentPath = newOutputPath; + currentFileName = newName; + bytesThisRotate = 0; + // Uncompressed for now + seqFileWriter = SequenceFile.createWriter(conf, newOutputStr, + ChukwaArchiveKey.class, ChunkImpl.class, + SequenceFile.CompressionType.NONE, null); + } catch (Throwable e) { + log.warn("Got an exception in rotate",e); + bailOut = true; + isRunning = false; + } finally { + lock.release(); } if (bailOut) { @@ -259,37 +261,38 @@ } if (chunks != null) { + ChukwaArchiveKey archiveKey = new ChukwaArchiveKey(); + + if (System.currentTimeMillis() >= nextTimePeriodComputation) { + computeTimePeriod(); + } try { - ChukwaArchiveKey archiveKey = new ChukwaArchiveKey(); - - if (System.currentTimeMillis() >= nextTimePeriodComputation) { - computeTimePeriod(); - } - synchronized (lock) { - for (Chunk chunk : chunks) { - archiveKey.setTimePartition(timePeriod); - archiveKey.setDataType(chunk.getDataType()); - archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() - + "/" + chunk.getStreamName()); - archiveKey.setSeqId(chunk.getSeqID()); - - if (chunk != null) { - // compute size for stats - dataSize += chunk.getData().length; - bytesThisRotate += chunk.getData().length; - seqFileWriter.append(archiveKey, chunk); - - String futureName = currentPath.getName().replace(".chukwa", ".done"); - result.addPend(futureName, currentOutputStr.getPos()); - } + lock.acquire(); + for (Chunk chunk : chunks) { + archiveKey.setTimePartition(timePeriod); + archiveKey.setDataType(chunk.getDataType()); + archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + + "/" + chunk.getStreamName()); + archiveKey.setSeqId(chunk.getSeqID()); + + if (chunk != null) { + // compute size for stats + dataSize += chunk.getData().length; + bytesThisRotate += chunk.getData().length; + seqFileWriter.append(archiveKey, chunk); + String futureName = currentPath.getName().replace(".chukwa", ".done"); + result.addPend(futureName, currentOutputStr.getPos()); } - }// End synchro + + } } catch (Throwable e) { // We don't want to loose anything log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e); DaemonWatcher.bailout(-1); isRunning = false; + } finally { + lock.release(); } } return result; @@ -311,19 +314,20 @@ // or Collector has received a kill -TERM try { - synchronized(lock) { - if (this.currentOutputStr != null) { - this.currentOutputStr.close(); - } - if(ENABLE_ROTATION_ON_CLOSE) - if(bytesThisRotate > 0) - fs.rename(currentPath, new Path(currentFileName + ".done")); - else - fs.delete(currentPath, false); - + lock.acquire(); + if (this.currentOutputStr != null) { + this.currentOutputStr.close(); } + if(ENABLE_ROTATION_ON_CLOSE) + if(bytesThisRotate > 0) + fs.rename(currentPath, new Path(currentFileName + ".done")); + else + fs.delete(currentPath, false); + } catch (Throwable e) { log.warn("cannot rename dataSink file:" + currentPath,e); + } finally { + lock.release(); } }