Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 9560 invoked from network); 5 Apr 2009 19:38:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 5 Apr 2009 19:38:48 -0000 Received: (qmail 14176 invoked by uid 500); 5 Apr 2009 19:38:48 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 14166 invoked by uid 500); 5 Apr 2009 19:38:48 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 14156 invoked by uid 99); 5 Apr 2009 19:38:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Apr 2009 19:38:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Apr 2009 19:38:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 383012388882; Sun, 5 Apr 2009 19:38:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r762139 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Date: Sun, 05 Apr 2009 19:38:18 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090405193818.383012388882@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Sun Apr 5 19:38:17 2009 New Revision: 762139 URL: http://svn.apache.org/viewvc?rev=762139&view=rev Log: CHUKWA-70. Rewrite FileAdaptor. (contributed by Jerome Boulon) Modified: hadoop/chukwa/trunk/CHANGES.txt hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=762139&r1=762138&r2=762139&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Sun Apr 5 19:38:17 2009 @@ -67,6 +67,8 @@ BUG FIXES + CHUKWA-70. Rewrite FileAdaptor. (Jerome Boulon via asrabkin) + CHUKWA-93. Fix NPE in SeqFileWriter. (Jiaqi Tan via asrabkin) CHUKWA-1. Remove lzo job configuration from Chukwa data processors. (Contribute by Jerome Boulon via Eric Yang) Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=762139&r1=762138&r2=762139&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Sun Apr 5 19:38:17 2009 @@ -18,47 +18,137 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; +import java.io.File; +import java.io.RandomAccessFile; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; -import org.apache.hadoop.chukwa.util.RecordConstants; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; + + +class FileAdaptorTailer extends Thread { + static Logger log = Logger.getLogger(FileAdaptorTailer.class); + private List adaptors = null; + private static Configuration conf = null; + private Object lock = new Object(); + + /** + * How often to call each adaptor. + */ + int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 10; + int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS; + + + public FileAdaptorTailer() { + + if (conf == null) { + ChukwaAgent agent = ChukwaAgent.getAgent(); + if (agent != null) { + conf = agent.getConfiguration(); + if (conf != null) { + SAMPLE_PERIOD_MS = conf.getInt( + "chukwaAgent.adaptor.context.switch.time", + DEFAULT_SAMPLE_PERIOD_MS); + } + } + } + + // iterations are much more common than adding a new adaptor + adaptors = new CopyOnWriteArrayList(); + + this.setDaemon(true); + start();// start the FileAdaptorTailer thread + } + @Override + public void run() { + while(true) { + try { + + while (adaptors.size() == 0) { + synchronized (lock) { + try { + log.info("Waiting queue is empty"); + lock.wait(); + } catch (InterruptedException e) { + // do nothing + } + } + } + + long startTime = System.currentTimeMillis(); + for (FileAdaptor adaptor: adaptors) { + log.info("calling this adaptor:" + adaptor.getStreamName()); + adaptor.sendFile(); + } + + long timeToReadFiles = System.currentTimeMillis() - startTime; + if (timeToReadFiles < SAMPLE_PERIOD_MS) { + Thread.sleep(SAMPLE_PERIOD_MS); + } + + }catch (Throwable e) { + log.warn("Exception in FileAdaptorTailer:",e); + } + } + } + + public void addFileAdaptor(FileAdaptor adaptor) { + adaptors.add(adaptor); + synchronized (lock) { + lock.notifyAll(); + } + } + + public void removeFileAdaptor(FileAdaptor adaptor) { + adaptors.remove(adaptor); + } +} /** * File Adaptor push small size file in one chunk to collector */ public class FileAdaptor implements Adaptor { - static Logger log; - - protected static Configuration conf = null; - private int attempts = 0; - - File toWatch; - /** - * next PHYSICAL offset to read - */ + static Logger log = Logger.getLogger(FileAdaptor.class); + static FileAdaptorTailer tailer = null; + + static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000; + static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD; + + static { + tailer = new FileAdaptorTailer(); + ChukwaAgent agent = ChukwaAgent.getAgent(); + if (agent != null) { + Configuration conf = agent.getConfiguration(); + if (conf != null) { + TIMEOUT_PERIOD = conf.getInt( + "chukwaAgent.adaptor.fileadaptor.timeoutperiod", + DEFAULT_TIMEOUT_PERIOD); + } + } + } + + private long startTime = 0; + private long timeOut = 0; + + + protected File toWatch; + protected RandomAccessFile reader = null; protected long fileReadOffset; protected String type; - private ChunkReceiver dest; - protected RandomAccessFile reader = null; + protected ChunkReceiver dest; protected long adaptorID; - + protected boolean shutdownCalled = false; + /** * The logical offset of the first byte of the file */ private long offsetOfFirstByte = 0; - static { - log = Logger.getLogger(FileAdaptor.class); - } - public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) { // in this case params = filename @@ -67,7 +157,9 @@ this.adaptorID = adaptorID; this.type = type; this.dest = dest; - this.attempts = 0; + this.startTime = System.currentTimeMillis(); + this.timeOut = startTime + TIMEOUT_PERIOD; + String[] words = params.split(" "); if (words.length > 1) { @@ -76,32 +168,87 @@ } else { toWatch = new File(params); } - try { - reader = new RandomAccessFile(toWatch, "r"); - long bufSize = toWatch.length(); - byte[] buf = new byte[(int) bufSize]; - reader.read(buf); - long fileTime = toWatch.lastModified(); - int bytesUsed = extractRecords(dest, 0, buf, fileTime); - } catch (Exception e) { - e.printStackTrace(); + + tailer.addFileAdaptor(this); + } + + void sendFile() { + long now = System.currentTimeMillis() ; + long oneMinAgo = now - (60*1000); + if (toWatch.exists()) { + if (toWatch.lastModified() > oneMinAgo && now < timeOut) { + log.info("Last modified time less than one minute, keep waiting"); + return; + } else { + try { + + long bufSize = toWatch.length(); + byte[] buf = new byte[(int) bufSize]; + + reader = new RandomAccessFile(toWatch, "r"); + reader.read(buf); + reader.close(); + reader = null; + + long fileTime = toWatch.lastModified(); + int bytesUsed = extractRecords(dest, 0, buf, fileTime); + this.fileReadOffset = bytesUsed; + unregisterFromAgent(); + cleanUp(); + }catch(Exception e) { + log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e); + } + finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception e) { + // do nothing + } + reader = null; + } + } + } + } else { + if (now > timeOut) { + log.warn("Couldn't read this file: " + toWatch.getAbsolutePath()); + unregisterFromAgent(); + cleanUp() ; + } } + } + + private void cleanUp() { + tailer.removeFileAdaptor(this); + if (reader != null) { + try { + reader.close(); + } catch (Exception e) { + // do nothing + } + reader = null; + } + } + + private void unregisterFromAgent() { ChukwaAgent agent = ChukwaAgent.getAgent(); if (agent != null) { agent.stopAdaptor(adaptorID, false); + } else { - log.info("Agent is null, running in default mode"); + log.warn("Agent is null, cannot unregister " + toWatch.getAbsolutePath()); } - this.fileReadOffset = bytes; + } - + + /** - * Do one last tail, and then stop + * We want to keep trying * * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown() */ public long shutdown() throws AdaptorException { - hardStop(); + // do nothing -- will be automatically done by TimeOut return fileReadOffset + offsetOfFirstByte; } @@ -109,6 +256,7 @@ * Stop tailing the file, effective immediately. */ public void hardStop() throws AdaptorException { + cleanUp(); } public String getStreamName() { @@ -118,19 +266,24 @@ /** * Extract records from a byte sequence * - * @param eq the queue to stick the new chunk[s] in - * @param buffOffsetInFile the byte offset in the stream at which buf[] begins - * @param buf the byte buffer to extract records from + * @param eq + * the queue to stick the new chunk[s] in + * @param buffOffsetInFile + * the byte offset in the stream at which buf[] begins + * @param buf + * the byte buffer to extract records from * @return the number of bytes processed * @throws InterruptedException */ - protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, + protected int extractRecords(final ChunkReceiver eq, long buffOffsetInFile, byte[] buf, long fileTime) throws InterruptedException { - ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), + final ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length, buf, this); String tags = chunk.getTags(); chunk.setTags(tags + " time=\"" + fileTime + "\""); + log.info("Adding " + toWatch.getAbsolutePath() + " to the queue"); eq.add(chunk); + log.info( toWatch.getAbsolutePath() + " added to the queue"); return buf.length; } @@ -145,4 +298,4 @@ + " " + fileReadOffset; } -} \ No newline at end of file +}